卡夫卡监听器不工作!内网隔离
Kafka Listener is not working! It is isolated in intranet
我的 Kafka 节点托管在 Google Cloud Dataproc 中。但是我们发现通过默认初始化脚本安装的Kafka是这样设置的,只允许内网访问。它与外界完全隔绝。 google 云网络外的生产者无法将消息发布到 Kafka,Kafka 消息也无法链接到其外网订阅者。
备注
我已将生产者IP列入白名单
通读其他 Whosebug、博客 post 和文档。我认为这可能是由于 /usr/lib/kafka/server.properties
.
中 Socket Server Settings
的 advertised.listeners
部分
第一个解
I added advertised.listeners=PLAINTEXT://[External_IP]:19092
then sudo /etc/init.d/kafka-server restart
结果
但是,当我尝试使用 Kafkacat 或 telnet 时,它总是失败。我还用各种端口测试了 advertised.listeners
第二个解决方案来自https://rmoff.net/2018/08/02/kafka-listeners-explained/
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0
############################# Socket Server Settings #############################
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
#
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
->>>>>>> I added below listener config according to https://rmoff.net/2018/08/02/kafka-listeners-explained/
listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:19092
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
advertised.listeners=EXTERNAL://[External_IP]:19092,INTERNAL://[Internal_IP]:9092
inter.broker.listener.name=INTERNAL
结果
结果和上面一样,不工作。
防火墙规则 [已更新]
这是我当前的防火墙规则配置。我做错了吗?
谁能帮我解决这个问题?
以下是适用于我的集群的方法:
我从第二种解决方案中设置了以下属性:
listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:19092
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
advertised.listeners=EXTERNAL://[External_IP]:19092,INTERNAL://[Internal_IP]:9092
inter.broker.listener.name=INTERNAL
我创建了一条防火墙规则,将端口 19092 开放到我的个人开发机器 IP 并将其应用于网络。 从我的机器,我尝试远程登录 kafka 服务器,我得到:
$ telnet [EXTERNAL-IP] 19092
Trying [EXTERNAL-IP]...
Connected to [EXTERNAL-IP].
Escape character is '^]'.
然后我尝试使用kafkacat,但出现错误。 运行 在调试中,我看到错误是因为我没有设置任何主题:
%7|1578351264.551|METADATA|rdkafka#producer-1| [thrd:main]: [EXTERNAL-IP]:19092/bootstrap: ===== Received metadata: application requested =====
%7|1578351264.551|METADATA|rdkafka#producer-1| [thrd:main]: [EXTERNAL-IP]:19092/bootstrap: ClusterId: jYxfi6zzR0euAovYyKCFZg, ControllerId: -1
%7|1578351264.551|METADATA|rdkafka#producer-1| [thrd:main]: [EXTERNAL-IP]:19092/bootstrap: 0 brokers, 0 topics
%7|1578351264.551|METADATA|rdkafka#producer-1| [thrd:main]: [EXTERNAL-IP]:19092/bootstrap: No brokers or topics in metadata: should retry
%7|1578351264.551|REQERR|rdkafka#producer-1| [thrd:main]: [EXTERNAL-IP]:19092/bootstrap: MetadataRequest failed: Local: Partial response: explicit actions Retry
%7|1578351264.551|RETRY|rdkafka#producer-1| [thrd:[EXTERNAL-IP]:19092/bootstrap]: [EXTERNAL-IP]:19092/bootstrap: Retrying MetadataRequest (v2, 25 bytes, retry 1/2, prev CorrId 3) in 100ms
请注意,我已尝试从集群外部连接到 kafka 服务器。在问题中,telnet和kafkacat与kafka服务器(kafka-tng-w-0)在同一台机器上运行。
这是一个示例 docker-compose.yaml 文件。
version: '2'
services:
zookeeper:
image: strimzi/kafka:0.20.0-kafka-2.6.0
command: [
"sh", "-c",
"bin/zookeeper-server-start.sh config/zookeeper.properties"
]
ports:
- "2181:2181"
environment:
LOG_DIR: /tmp/logs
kafka:
image: strimzi/kafka:0.20.0-kafka-2.6.0
command: [
"sh", "-c",
"bin/kafka-server-start.sh config/server.properties --override
listeners=$${KAFKA_LISTENERS} --override
advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override
zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}"
]
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
LOG_DIR: "/tmp/logs"
# Dev GQ - Laptop
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://172.23.240.1:9092
# AWS Pre-Prod
#KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://11.122.200.229:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
这是一个示例 Quarkus application.properties 文件,其中 kafka bootstrap 服务器配置为 docker-compose.yaml.
中的广告侦听器
# Configure the SmallRye Kafka connector
# Dev GQ - Laptop
mp.messaging.connector.smallrye-kafka.bootstrap.servers=172.23.240.1:9092
# AWS Pre-Prod
#mp.messaging.connector.smallrye-kafka.bootstrap.servers=11.122.200.229:9092
quarkus.kafka.health.enabled=true
# Configure the Kafka sink (we write to it)
mp.messaging.outgoing.generated-price.connector=smallrye-kafka
mp.messaging.outgoing.generated-price.topic=prices
mp.messaging.outgoing.generated-price.value.serializer=org.apache.kafka.common.serialization.IntegerSerializer
# Configure the Kafka source (we read from it)
mp.messaging.incoming.prices.connector=smallrye-kafka
mp.messaging.incoming.prices.topic=prices
# ..... more codes
我的 Kafka 节点托管在 Google Cloud Dataproc 中。但是我们发现通过默认初始化脚本安装的Kafka是这样设置的,只允许内网访问。它与外界完全隔绝。 google 云网络外的生产者无法将消息发布到 Kafka,Kafka 消息也无法链接到其外网订阅者。
备注
我已将生产者IP列入白名单
通读其他 Whosebug、博客 post 和文档。我认为这可能是由于 /usr/lib/kafka/server.properties
.
Socket Server Settings
的 advertised.listeners
部分
第一个解
I added
advertised.listeners=PLAINTEXT://[External_IP]:19092
then
sudo /etc/init.d/kafka-server restart
结果
但是,当我尝试使用 Kafkacat 或 telnet 时,它总是失败。我还用各种端口测试了 advertised.listeners
第二个解决方案来自https://rmoff.net/2018/08/02/kafka-listeners-explained/
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0
############################# Socket Server Settings #############################
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
#
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
->>>>>>> I added below listener config according to https://rmoff.net/2018/08/02/kafka-listeners-explained/
listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:19092
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
advertised.listeners=EXTERNAL://[External_IP]:19092,INTERNAL://[Internal_IP]:9092
inter.broker.listener.name=INTERNAL
结果
结果和上面一样,不工作。
防火墙规则 [已更新]
这是我当前的防火墙规则配置。我做错了吗?
谁能帮我解决这个问题?
以下是适用于我的集群的方法:
我从第二种解决方案中设置了以下属性:
listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:19092
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
advertised.listeners=EXTERNAL://[External_IP]:19092,INTERNAL://[Internal_IP]:9092
inter.broker.listener.name=INTERNAL
我创建了一条防火墙规则,将端口 19092 开放到我的个人开发机器 IP 并将其应用于网络。 从我的机器,我尝试远程登录 kafka 服务器,我得到:
$ telnet [EXTERNAL-IP] 19092
Trying [EXTERNAL-IP]...
Connected to [EXTERNAL-IP].
Escape character is '^]'.
然后我尝试使用kafkacat,但出现错误。 运行 在调试中,我看到错误是因为我没有设置任何主题:
%7|1578351264.551|METADATA|rdkafka#producer-1| [thrd:main]: [EXTERNAL-IP]:19092/bootstrap: ===== Received metadata: application requested =====
%7|1578351264.551|METADATA|rdkafka#producer-1| [thrd:main]: [EXTERNAL-IP]:19092/bootstrap: ClusterId: jYxfi6zzR0euAovYyKCFZg, ControllerId: -1
%7|1578351264.551|METADATA|rdkafka#producer-1| [thrd:main]: [EXTERNAL-IP]:19092/bootstrap: 0 brokers, 0 topics
%7|1578351264.551|METADATA|rdkafka#producer-1| [thrd:main]: [EXTERNAL-IP]:19092/bootstrap: No brokers or topics in metadata: should retry
%7|1578351264.551|REQERR|rdkafka#producer-1| [thrd:main]: [EXTERNAL-IP]:19092/bootstrap: MetadataRequest failed: Local: Partial response: explicit actions Retry
%7|1578351264.551|RETRY|rdkafka#producer-1| [thrd:[EXTERNAL-IP]:19092/bootstrap]: [EXTERNAL-IP]:19092/bootstrap: Retrying MetadataRequest (v2, 25 bytes, retry 1/2, prev CorrId 3) in 100ms
请注意,我已尝试从集群外部连接到 kafka 服务器。在问题中,telnet和kafkacat与kafka服务器(kafka-tng-w-0)在同一台机器上运行。
这是一个示例 docker-compose.yaml 文件。
version: '2'
services:
zookeeper:
image: strimzi/kafka:0.20.0-kafka-2.6.0
command: [
"sh", "-c",
"bin/zookeeper-server-start.sh config/zookeeper.properties"
]
ports:
- "2181:2181"
environment:
LOG_DIR: /tmp/logs
kafka:
image: strimzi/kafka:0.20.0-kafka-2.6.0
command: [
"sh", "-c",
"bin/kafka-server-start.sh config/server.properties --override
listeners=$${KAFKA_LISTENERS} --override
advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override
zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}"
]
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
LOG_DIR: "/tmp/logs"
# Dev GQ - Laptop
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://172.23.240.1:9092
# AWS Pre-Prod
#KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://11.122.200.229:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
这是一个示例 Quarkus application.properties 文件,其中 kafka bootstrap 服务器配置为 docker-compose.yaml.
中的广告侦听器# Configure the SmallRye Kafka connector
# Dev GQ - Laptop
mp.messaging.connector.smallrye-kafka.bootstrap.servers=172.23.240.1:9092
# AWS Pre-Prod
#mp.messaging.connector.smallrye-kafka.bootstrap.servers=11.122.200.229:9092
quarkus.kafka.health.enabled=true
# Configure the Kafka sink (we write to it)
mp.messaging.outgoing.generated-price.connector=smallrye-kafka
mp.messaging.outgoing.generated-price.topic=prices
mp.messaging.outgoing.generated-price.value.serializer=org.apache.kafka.common.serialization.IntegerSerializer
# Configure the Kafka source (we read from it)
mp.messaging.incoming.prices.connector=smallrye-kafka
mp.messaging.incoming.prices.topic=prices
# ..... more codes