Bootstrap 代理未用于从主题中消费
Bootstrap broker not being used to consume from topic
一个简单的 spring-boot-kafka,它从网络集群上的主题消费:
错误:
Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected
Connection to node -1 (localhost/127.0.0.1:9092) could not be
established. Broker may not be available.
谜题:
The configured broker is not local, it's BROKER_1.FOO.NET:9094, and it
is available.
pom.xml
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.7.1</version>
</dependency>
配置class:
@Slf4j
@EnableKafka
@Configuration
@PropertySource("dv/application.properties")
public class KafkaConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "BROKER_1.FOO.NET:9094");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_1");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory concurrentKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
消费者Class:
@Slf4j
@Component
public class KafkaConsumer {
@KafkaListener(topics = "foo_topic", groupId = "group_1")
public void consume(String message) {
log.info(message);
}
}
it's BROKER_1.FOO.NET:9094, and it is available.
bootstrap 端口可能可用并响应请求,但该代理随后 返回 它已配置 advertised.listeners
.
根据您的错误,
- 设置为 localhost/127.0.0.1:9092
- 或您将获得bootstrap服务器配置
的默认Spring属性
尽管 ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG 设置,但未使用正确的 boostrap 服务器。日志中的 ConsumerConfig 值为:bootstrap.servers = [localhost:9092].
我需要将 Bean 名称从“concurrentKafkaListenerContainerFactory”更改为“kafkaListenerContainerFactory”。所以:
@Bean
public ConcurrentKafkaListenerContainerFactory concurrentKafkaListenerContainerFactory() {
需要更改为:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
一旦我做了这个单一的改变,bootstrap 服务器被正确识别为配置的侦听器。
现在,ConsumerConfig 日志显示:
bootstrap.servers=[BROKER_1.FOO.NET:9094]
此外,bootstrap服务器不需要配置advertised.listeners,尽管本文暗示:https://www.confluent.io/blog/kafka-listeners-explained/。
当我询问我们的 Kafka 管理员关于添加 advertised.listeners 时,他告诉我他实际上已经为简单起见删除了它们并向我发送了以下配置说明:
advertised.listeners
use, if different than the listeners config property. In IaaS
environments, this may need to be different from the interface to
which the broker binds. If this is not set, the value for listeners
will be used. Unlike listeners, it is not valid to advertise the
0.0.0.0 meta-address. Also unlike listeners, there can be duplicated ports in this property, so that one listener can be configured to
advertise another listener's address. This can be useful in some cases
where external load balancers are used.Listeners to publish to ZooKeeper for clients to
他是对的,因为我在使用 kafkacat 时得到了正确的预期元数据:
kafkacat -b BROKER_1.FOO.NET:9094 -L
其中 returns :
Metadata for all topics (from broker 2: sasl_ssl://xx.foo.net:9094/2):
5 brokers:
broker 5 at xx22.ttgtpmg.net:9094 (controller)
broker 4 at xx21.ttgtpmg.net:9094
broker 1 at xx18.ttgtpmg.net:9094
broker 2 at xx19.ttgtpmg.net:9094
broker 3 at xx20.ttgtpmg.net:9094
一个简单的 spring-boot-kafka,它从网络集群上的主题消费:
错误:
Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected
Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
谜题:
The configured broker is not local, it's BROKER_1.FOO.NET:9094, and it is available.
pom.xml
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.7.1</version>
</dependency>
配置class:
@Slf4j
@EnableKafka
@Configuration
@PropertySource("dv/application.properties")
public class KafkaConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "BROKER_1.FOO.NET:9094");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_1");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory concurrentKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
消费者Class:
@Slf4j
@Component
public class KafkaConsumer {
@KafkaListener(topics = "foo_topic", groupId = "group_1")
public void consume(String message) {
log.info(message);
}
}
it's BROKER_1.FOO.NET:9094, and it is available.
bootstrap 端口可能可用并响应请求,但该代理随后 返回 它已配置 advertised.listeners
.
根据您的错误,
- 设置为 localhost/127.0.0.1:9092
- 或您将获得bootstrap服务器配置 的默认Spring属性
尽管 ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG 设置,但未使用正确的 boostrap 服务器。日志中的 ConsumerConfig 值为:bootstrap.servers = [localhost:9092].
我需要将 Bean 名称从“concurrentKafkaListenerContainerFactory”更改为“kafkaListenerContainerFactory”。所以:
@Bean
public ConcurrentKafkaListenerContainerFactory concurrentKafkaListenerContainerFactory() {
需要更改为:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
一旦我做了这个单一的改变,bootstrap 服务器被正确识别为配置的侦听器。
现在,ConsumerConfig 日志显示:
bootstrap.servers=[BROKER_1.FOO.NET:9094]
此外,bootstrap服务器不需要配置advertised.listeners,尽管本文暗示:https://www.confluent.io/blog/kafka-listeners-explained/。
当我询问我们的 Kafka 管理员关于添加 advertised.listeners 时,他告诉我他实际上已经为简单起见删除了它们并向我发送了以下配置说明:
advertised.listeners
use, if different than the listeners config property. In IaaS environments, this may need to be different from the interface to which the broker binds. If this is not set, the value for listeners will be used. Unlike listeners, it is not valid to advertise the 0.0.0.0 meta-address. Also unlike listeners, there can be duplicated ports in this property, so that one listener can be configured to advertise another listener's address. This can be useful in some cases where external load balancers are used.Listeners to publish to ZooKeeper for clients to
他是对的,因为我在使用 kafkacat 时得到了正确的预期元数据:
kafkacat -b BROKER_1.FOO.NET:9094 -L
其中 returns :
Metadata for all topics (from broker 2: sasl_ssl://xx.foo.net:9094/2):
5 brokers:
broker 5 at xx22.ttgtpmg.net:9094 (controller)
broker 4 at xx21.ttgtpmg.net:9094
broker 1 at xx18.ttgtpmg.net:9094
broker 2 at xx19.ttgtpmg.net:9094
broker 3 at xx20.ttgtpmg.net:9094