Kafka Java 生产者无法向 kafka 实例发送消息
Kafka Java Producer is not able to send message to kafka instance
我是 运行 docker 容器中的一个 kafka 实例,具有以下 docker-compose.yml 文件。
version: "3"
services:
zookeeper:
image: 'bitnami/zookeeper:latest'
ports:
- '2181:2181'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: 'bitnami/kafka:latest'
ports:
- '9092:9092'
- '9093:9093'
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_BROKER_ID=1
- KAFKA_CREATE_TOPICS="topic_name:1:3"
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://127.0.0.1:9092,EXTERNAL://127.0.0.1:9093
- KAFKA_INTER_BROKER_LISTENER_NAME=EXTERNAL
depends_on:
- zookeeper
运行 非常好。我通过kafkacat发送数据来测试kafka。没问题,我可以通过 kafka 消费者接收数据。请检查以下 kafkacat 命令。
kafkacat -P -b 127.0.0.1:9092 -t topic_name
kafkacat -C -b 127.0.0.1:9092 -t topic_name
然而,当我尝试通过 java 生产者代码发送它时,我无法从 kafkacat 消费者那里收到。请检查下面的 java 生产者代码。我想听听你的建议?提前致谢
public class DataProducer {
public static void main(String[] args) {
KafkaTemplate<String,String> kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(kafkaConfig()));
kafkaTemplate.send("topic_name", "test");
}
public static Map<String, Object> kafkaConfig() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}}
还在 kafkacat 命令输出后添加元数据。
kafkacat -b 127.0.0.1:9092 -L
Metadata for all topics (from broker 1: 127.0.0.1:9092/1):
1 brokers:
broker 1 at 127.0.0.1:9092 (controller)
2 topics:
topic "topic_name" with 1 partitions:
partition 0, leader 1, replicas: 1, isrs: 1
topic "__consumer_offsets" with 50 partitions:
partition 0, leader 1, replicas: 1, isrs: 1
partition 1, leader 1, replicas: 1, isrs: 1
..
partition 48, leader 1, replicas: 1, isrs: 1
partition 49, leader 1, replicas: 1, isrs: 1
代理配置似乎没问题,因为您得到了正确的元数据。
我认为问题出在您的代码中。 kafkaTemplate.send()
是一个异步操作,您的流程很可能在生产者设法实际发送消息之前结束。尝试向该发送方法添加一个 .get()
以强制它同步。
kafkaTemplate.send("topic_name", "test").get();
我有类似的问题。这很奇怪,但是 kafka-clients 版本的 Maven 依赖性从 latest(2.6.0)
到 2.0.0
(或 up to 2.5.0
)的变化帮助了我。
我是 运行 docker 容器中的一个 kafka 实例,具有以下 docker-compose.yml 文件。
version: "3"
services:
zookeeper:
image: 'bitnami/zookeeper:latest'
ports:
- '2181:2181'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: 'bitnami/kafka:latest'
ports:
- '9092:9092'
- '9093:9093'
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_BROKER_ID=1
- KAFKA_CREATE_TOPICS="topic_name:1:3"
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://127.0.0.1:9092,EXTERNAL://127.0.0.1:9093
- KAFKA_INTER_BROKER_LISTENER_NAME=EXTERNAL
depends_on:
- zookeeper
运行 非常好。我通过kafkacat发送数据来测试kafka。没问题,我可以通过 kafka 消费者接收数据。请检查以下 kafkacat 命令。
kafkacat -P -b 127.0.0.1:9092 -t topic_name
kafkacat -C -b 127.0.0.1:9092 -t topic_name
然而,当我尝试通过 java 生产者代码发送它时,我无法从 kafkacat 消费者那里收到。请检查下面的 java 生产者代码。我想听听你的建议?提前致谢
public class DataProducer {
public static void main(String[] args) {
KafkaTemplate<String,String> kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(kafkaConfig()));
kafkaTemplate.send("topic_name", "test");
}
public static Map<String, Object> kafkaConfig() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}}
还在 kafkacat 命令输出后添加元数据。
kafkacat -b 127.0.0.1:9092 -L
Metadata for all topics (from broker 1: 127.0.0.1:9092/1):
1 brokers:
broker 1 at 127.0.0.1:9092 (controller)
2 topics:
topic "topic_name" with 1 partitions:
partition 0, leader 1, replicas: 1, isrs: 1
topic "__consumer_offsets" with 50 partitions:
partition 0, leader 1, replicas: 1, isrs: 1
partition 1, leader 1, replicas: 1, isrs: 1
..
partition 48, leader 1, replicas: 1, isrs: 1
partition 49, leader 1, replicas: 1, isrs: 1
代理配置似乎没问题,因为您得到了正确的元数据。
我认为问题出在您的代码中。 kafkaTemplate.send()
是一个异步操作,您的流程很可能在生产者设法实际发送消息之前结束。尝试向该发送方法添加一个 .get()
以强制它同步。
kafkaTemplate.send("topic_name", "test").get();
我有类似的问题。这很奇怪,但是 kafka-clients 版本的 Maven 依赖性从 latest(2.6.0)
到 2.0.0
(或 up to 2.5.0
)的变化帮助了我。