尝试 运行 风暴拓扑以使用来自 kafka 的消息时连接被拒绝

Connection Refused while trying to run a storm topology to consume message from kafka

我的本地主机上有一个 kafka 代理 运行ning。 kafka 附带的 zookeeper 运行ning 也在我的本地主机上。

我创建了一个简单的拓扑结构,它将使用来自 kafka 代理的消息。但是,当我 运行 拓扑时,我得到连接被拒绝的异常。

下面是我的拓扑的代码

import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;

public class KafkaTopology 
{
    private static final String KAFKA_SPOUT_ID = "kafkaSpout"; 
    private static final String KAFKA_BOLT_ID = "kafkaBolt";

    public KafkaTopology(){

    }

    private SpoutConfig constructKafkaSpoutConf() 
    {
        BrokerHosts hosts = new ZkHosts("localhost:2181");
        String topic = "topic2";
        String zkRoot = "/topic2";
        String consumerGroupId = "StormSpout";

        SpoutConfig spoutConfig = new SpoutConfig(hosts, topic, zkRoot, consumerGroupId);

        return spoutConfig;
    }



    public void configureKafkaSpout(TopologyBuilder builder) 
    {
        KafkaSpout kafkaSpout = new KafkaSpout(constructKafkaSpoutConf());
        builder.setSpout(KAFKA_SPOUT_ID, kafkaSpout);
    }

    public void configureKafkaBolt(TopologyBuilder builder)
    {
        KafkaBolt kafkaBolt = new KafkaBolt();
        builder.setBolt(KAFKA_BOLT_ID, kafkaBolt).globalGrouping(KAFKA_SPOUT_ID);
    }

    private void buildAndSubmit() throws Exception
    {
        TopologyBuilder builder = new TopologyBuilder();
        configureKafkaSpout(builder);
        configureKafkaBolt(builder);

        Config conf = new Config();
        conf.setDebug(true);

        StormSubmitter.submitTopology("kafka-processor", 
                                    conf, builder.createTopology());
    }

    public static void main(String[] str) throws Exception
    {
        KafkaTopology kafkaTopology  = new KafkaTopology();
        kafkaTopology.buildAndSubmit();
    }

}

我得到的异常如下

    Exception in thread "main" java.lang.RuntimeException: org.apache.thrift7.transport.TTransportException: java.net.ConnectException: Connection refused
        at backtype.storm.utils.NimbusClient.getConfiguredClient(NimbusClient.java:38)
        at backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:116)
        at backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:70)
        at udacity.storm.KafkaTopology.buildAndSubmit(KafkaTopology.java:66)
        at udacity.storm.KafkaTopology.main(KafkaTopology.java:73)
    Caused by: org.apache.thrift7.transport.TTransportException: java.net.ConnectException: Connection refused
        at org.apache.thrift7.transport.TSocket.open(TSocket.java:183)
        at org.apache.thrift7.transport.TFramedTransport.open(TFramedTransport.java:81)
        at backtype.storm.security.auth.SimpleTransportPlugin.connect(SimpleTransportPlugin.java:83)
        at backtype.storm.security.auth.ThriftClient.<init>(ThriftClient.java:63)
        at backtype.storm.utils.NimbusClient.<init>(NimbusClient.java:47)
        at backtype.storm.utils.NimbusClient.<init>(NimbusClient.java:43)
        at backtype.storm.utils.NimbusClient.getConfiguredClient(NimbusClient.java:36)
        ... 4 more
    Caused by: java.net.ConnectException: Connection refused
        at java.net.PlainSocketImpl.socketConnect(Native Method)
        at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
        at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
        at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
        at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
        at java.net.Socket.connect(Socket.java:589)
        at org.apache.thrift7.transport.TSocket.open(TSocket.java:178)
        ... 10 more

问题在于尝试连接到实时集群,但 运行 它在本地集群上。使用这个解决了它

    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("kafka-processor", 
                                conf, builder.createTopology());