如何使用动态 DNS 设置 Public Kafka Broker?

How to Setup a Public Kafka Broker Using a Dynamic DNS?

我使用 3 个 Zookeeper 和每个代理配置了一个包含 3 个代理的 Kafka 集群。下图显示了我的集群的图形表示。

使用主机 192.168.0.10 在同一网络中进行的生产者和消费者测试通过 kafka-console-producerkafka-console-consumer 命令完美运行。

基于该上下文,当我尝试通过 kafka-console-producer.sh --broker-list DYNAMIC_DNS_ADDR:30192,DYNAMIC_DNS_ADDR:30292,DYNAMIC_DNS_ADDR:30392 --topic twitter_tweets 通过 Internet 生成一些数据时,出现以下错误:

[2018-12-10 09:59:20,772] ERROR Error when sending message to topic twitter_tweets with key: null, value: 16 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for twitter_tweets-1: 1505 ms has passed since batch creation plus linger time [2018-12-10 09:59:22,273] WARN [Producer clientId=console-producer] Connection to node 1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

Broker 侦听器配置了以下属性:

listeners=PLAINTEXT://0.0.0.0:9092,SSL://0.0.0.0:9443
advertised.listeners=PLAINTEXT://192.168.0.241:9092,SSL://192.168.0.241:9443

显然,advertised.listeners 属性 每个代理的 IP 地址都发生了变化。我正在使用 CentOS 6.10Kafka 2.0.1 进行该设置。远程登录测试有效。另一个到 Kafka REST 代理端口的转发正在通过 Internet 工作并列出所有主题。

https://rmoff.net/2018/08/02/kafka-listeners-explained/

您需要两个侦听器 — 一个响应 并通告 内部地址,一个用于外部地址。

关键是您的客户端连接的侦听器将return该侦听器的主机地址和端口.

目前,您正在将外部流量欺骗到内部流量,因此您的 外部 流量正在访问 内部 侦听器.

您需要这样的东西(根据经纪人的要求改变 aws_internal_listener 的 IP/hostname):

KAFKA_LISTENERS: aws_internal_listener://192.168.0.241:9092,external_listener://192.168.0.241:29092

KAFKA_ADVERTISED_LISTENERS: aws_internal_listener://192.168.0.241:9092,external_listener://DYNAMIC_DNS_ADDR:29092

KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: aws_internal_listener:PLAINTEXT,external_listener:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: aws_internal_listener

那么 DYNAMIC_DNS_ADDR 的端口转发器应该将连接重定向到 AWS 节点上的 29092。关键是 external 连接不应该在与 internal 侦听器匹配的主机上的侦听器端口结束(它通告一个内部 192.168.0地址)

使用 kafkacat -L -b DYNAMIC_DNS_ADDR:29092 调试和验证您的配置,如 in the article here 所述。