尝试 运行 风暴拓扑以使用来自 kafka 的消息时连接被拒绝
Connection Refused while trying to run a storm topology to consume message from kafka
我的本地主机上有一个 kafka 代理 运行ning。 kafka 附带的 zookeeper 运行ning 也在我的本地主机上。
我创建了一个简单的拓扑结构,它将使用来自 kafka 代理的消息。但是,当我 运行 拓扑时,我得到连接被拒绝的异常。
下面是我的拓扑的代码
import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
public class KafkaTopology
{
private static final String KAFKA_SPOUT_ID = "kafkaSpout";
private static final String KAFKA_BOLT_ID = "kafkaBolt";
public KafkaTopology(){
}
private SpoutConfig constructKafkaSpoutConf()
{
BrokerHosts hosts = new ZkHosts("localhost:2181");
String topic = "topic2";
String zkRoot = "/topic2";
String consumerGroupId = "StormSpout";
SpoutConfig spoutConfig = new SpoutConfig(hosts, topic, zkRoot, consumerGroupId);
return spoutConfig;
}
public void configureKafkaSpout(TopologyBuilder builder)
{
KafkaSpout kafkaSpout = new KafkaSpout(constructKafkaSpoutConf());
builder.setSpout(KAFKA_SPOUT_ID, kafkaSpout);
}
public void configureKafkaBolt(TopologyBuilder builder)
{
KafkaBolt kafkaBolt = new KafkaBolt();
builder.setBolt(KAFKA_BOLT_ID, kafkaBolt).globalGrouping(KAFKA_SPOUT_ID);
}
private void buildAndSubmit() throws Exception
{
TopologyBuilder builder = new TopologyBuilder();
configureKafkaSpout(builder);
configureKafkaBolt(builder);
Config conf = new Config();
conf.setDebug(true);
StormSubmitter.submitTopology("kafka-processor",
conf, builder.createTopology());
}
public static void main(String[] str) throws Exception
{
KafkaTopology kafkaTopology = new KafkaTopology();
kafkaTopology.buildAndSubmit();
}
}
我得到的异常如下
Exception in thread "main" java.lang.RuntimeException: org.apache.thrift7.transport.TTransportException: java.net.ConnectException: Connection refused
at backtype.storm.utils.NimbusClient.getConfiguredClient(NimbusClient.java:38)
at backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:116)
at backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:70)
at udacity.storm.KafkaTopology.buildAndSubmit(KafkaTopology.java:66)
at udacity.storm.KafkaTopology.main(KafkaTopology.java:73)
Caused by: org.apache.thrift7.transport.TTransportException: java.net.ConnectException: Connection refused
at org.apache.thrift7.transport.TSocket.open(TSocket.java:183)
at org.apache.thrift7.transport.TFramedTransport.open(TFramedTransport.java:81)
at backtype.storm.security.auth.SimpleTransportPlugin.connect(SimpleTransportPlugin.java:83)
at backtype.storm.security.auth.ThriftClient.<init>(ThriftClient.java:63)
at backtype.storm.utils.NimbusClient.<init>(NimbusClient.java:47)
at backtype.storm.utils.NimbusClient.<init>(NimbusClient.java:43)
at backtype.storm.utils.NimbusClient.getConfiguredClient(NimbusClient.java:36)
... 4 more
Caused by: java.net.ConnectException: Connection refused
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at org.apache.thrift7.transport.TSocket.open(TSocket.java:178)
... 10 more
问题在于尝试连接到实时集群,但 运行 它在本地集群上。使用这个解决了它
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("kafka-processor",
conf, builder.createTopology());
我的本地主机上有一个 kafka 代理 运行ning。 kafka 附带的 zookeeper 运行ning 也在我的本地主机上。
我创建了一个简单的拓扑结构,它将使用来自 kafka 代理的消息。但是,当我 运行 拓扑时,我得到连接被拒绝的异常。
下面是我的拓扑的代码
import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
public class KafkaTopology
{
private static final String KAFKA_SPOUT_ID = "kafkaSpout";
private static final String KAFKA_BOLT_ID = "kafkaBolt";
public KafkaTopology(){
}
private SpoutConfig constructKafkaSpoutConf()
{
BrokerHosts hosts = new ZkHosts("localhost:2181");
String topic = "topic2";
String zkRoot = "/topic2";
String consumerGroupId = "StormSpout";
SpoutConfig spoutConfig = new SpoutConfig(hosts, topic, zkRoot, consumerGroupId);
return spoutConfig;
}
public void configureKafkaSpout(TopologyBuilder builder)
{
KafkaSpout kafkaSpout = new KafkaSpout(constructKafkaSpoutConf());
builder.setSpout(KAFKA_SPOUT_ID, kafkaSpout);
}
public void configureKafkaBolt(TopologyBuilder builder)
{
KafkaBolt kafkaBolt = new KafkaBolt();
builder.setBolt(KAFKA_BOLT_ID, kafkaBolt).globalGrouping(KAFKA_SPOUT_ID);
}
private void buildAndSubmit() throws Exception
{
TopologyBuilder builder = new TopologyBuilder();
configureKafkaSpout(builder);
configureKafkaBolt(builder);
Config conf = new Config();
conf.setDebug(true);
StormSubmitter.submitTopology("kafka-processor",
conf, builder.createTopology());
}
public static void main(String[] str) throws Exception
{
KafkaTopology kafkaTopology = new KafkaTopology();
kafkaTopology.buildAndSubmit();
}
}
我得到的异常如下
Exception in thread "main" java.lang.RuntimeException: org.apache.thrift7.transport.TTransportException: java.net.ConnectException: Connection refused
at backtype.storm.utils.NimbusClient.getConfiguredClient(NimbusClient.java:38)
at backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:116)
at backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:70)
at udacity.storm.KafkaTopology.buildAndSubmit(KafkaTopology.java:66)
at udacity.storm.KafkaTopology.main(KafkaTopology.java:73)
Caused by: org.apache.thrift7.transport.TTransportException: java.net.ConnectException: Connection refused
at org.apache.thrift7.transport.TSocket.open(TSocket.java:183)
at org.apache.thrift7.transport.TFramedTransport.open(TFramedTransport.java:81)
at backtype.storm.security.auth.SimpleTransportPlugin.connect(SimpleTransportPlugin.java:83)
at backtype.storm.security.auth.ThriftClient.<init>(ThriftClient.java:63)
at backtype.storm.utils.NimbusClient.<init>(NimbusClient.java:47)
at backtype.storm.utils.NimbusClient.<init>(NimbusClient.java:43)
at backtype.storm.utils.NimbusClient.getConfiguredClient(NimbusClient.java:36)
... 4 more
Caused by: java.net.ConnectException: Connection refused
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at org.apache.thrift7.transport.TSocket.open(TSocket.java:178)
... 10 more
问题在于尝试连接到实时集群,但 运行 它在本地集群上。使用这个解决了它
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("kafka-processor",
conf, builder.createTopology());