Kafka 生产者无法通过 Internet 连接到代理。如果在本地网络中作为经纪人,效果很好。从 Internet 工作的 Telnet 连接

Kafka producer unable to connect to broker via Internet. Works well if in local network as broker. Telnet connectivity working from internet

我在家庭网络的计算机上有一个 Kafka 代理 运行。当我将我的 java 生产者(在另一台计算机上)从同一个 wifi 网络连接到这个代理时,它工作得非常好并且能够毫无问题地 post 消息。

作为我应用程序的后续步骤,我已经在互联网上公开了我的计算机和 Kafka 的 9092 端口。我还为此端口打开了系统上的所有防火墙。当我从任何其他网络进行 telnet/nc 连接尝试时,它连接良好。 Kafka 主机还在 netstat 输出中显示了建立的连接。

 telnet DNSHOSTNAME 9092
Trying MYIP...
Connected to DNSHOSTNAME.
Escape character is '^]'.

但是从这个外部网络,当我使用完全相同的 kafka java 生产者从这个外部网络发送到我的家用计算机时,它总是失败并出现以下错误。我也没有在与此相关的 Kafka 日志中看到任何连接尝试或错误。

    Error while producing message to topic :TP1-0@-1
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for TP1-0: 30005 ms has passed since batch creation plus linger time

另一个有趣的地方是,我还有其他应用程序 - RabbitMQ 和 Tomcat 也与 Kafka 在相同的 machine 上,具有相同的防火墙和路由器规则。我的同一个 java 应用程序能够通过互联网连接通过 TCP 和 HTTP 向他们发布。只有 Kafka 不工作。

前提是telnet和nc连接正常,我认为这不是防火墙或网络路由的问题。这让我只剩下生产者代码,如下所示。是否有某种类型的设置可以应用于 Kafka Broker 或 Kafka Producer 端,以便他们能够通过互联网连接?与消费者代码相同的问题。无法从互联网连接。

生产者代码:

   public static void main(String[] args){

       Properties props = new Properties();
       //props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "DNSHOSTNAME:9092");       
       props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "MYEXTERNALIP:9092");

       props.put(ProducerConfig.ACKS_CONFIG, "all");
       props.put(ProducerConfig.RETRIES_CONFIG, 0);
       props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
       props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

       Producer<String, String> producer = new KafkaProducer<String, String>(props);
       TestCallback callback = new TestCallback();
       Random rnd = new Random();
           
           ProducerRecord<String, String> data = new ProducerRecord<String, String>("TP1", "s2", "TEST DATA" );
           producer.send(data, callback);
       producer.close();
   }


   private static class TestCallback implements Callback {
       @Override
       public void onCompletion(RecordMetadata recordMetadata, Exception e) {
           if (e != null) {
               System.out.println("Error while producing message to topic :" + recordMetadata);
               e.printStackTrace();
           } else {
               String message = String.format("sent message to topic:%s partition:%s  offset:%s", recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
               System.out.println(message);
           }
       }
   }

已完成故障排除步骤:

  1. 在 3 台不同的计算机上部署了 Kafka,2 台 windows 和 1 台 mac。相同的结果。在同一网络上运行良好,而不是在互联网上运行时。 Telnet 与来自 internet/outside 网络的每台 kafka 计算机一起工作。
  2. 尝试了 3 种不同的外部网络,一种是我的小区 phone 的热点。
  3. 在 kafka machine 上也尝试了其他软件,如上所述,它们工作正常。
  4. 完全禁用系统防火墙只是为了确保没有防火墙问题,但运气不佳。

有人可以帮我解决这个问题吗?

提前致谢!

您描述的问题可以通过 advertised.listeners

在互联网上为每个经纪人“宣传”一个地址来解决

telnet 和 nc 检查端口是否打开(listeners 配置),但无法检查代理 bootstrap 是否正确返回客户端,您可以改用 kafkacat -L -b <bootstrap> ,这将 return 集群中的代理列表,并且应该可以从您想要 运行 客户端

的位置连接到每个代理

如果您的单个 WAN 地址后面有多个代理,他们将不得不通告/转发单独的端口以实现完全连接


替代解决方案包括 Kafka REST 代理

无论哪种情况,都应该将 SSL 添加到连接中