如何使用 spring-kafka 为监听器传递多个 bootstrap 服务器
How to pass multiple bootstrap servers for listener using spring-kafka
我有一个侦听器需要从多个具有相同主题的 kafka 服务器读取,这些服务器都配置在一个 zookeeper 下。我如何从这些多台服务器读取。你能帮忙解决一下吗?
我可以指向 zookeeper 而不是 Kafka 服务器吗?
@KafkaListener
需要 KafkaListenerContainerFactory
@Bean
,而后者又基于 ConsumerFactory
。 DefaultKafkaConsumerFactory
接受 Map<String, Object>
消费者配置:
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ...);
...
return props;
}
}
其中 ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
正是标准的 Apache Kafka bootstrap.servers
property:
A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form host1:port1,host2:port2,.... Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down).
不行,你不能指向Zookeeper地址。 Kafka 不再支持它。
映射具有键:字符串和值:对象。
key:ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG 的对象应该是由 ',' 分隔的一系列主机端口对,例如:host1:port1,host2:port2,host3:port3 ... ...
例如监听三个服务器:localhost:9092,192.168.22.12:9088,localhost:7898
我有一个侦听器需要从多个具有相同主题的 kafka 服务器读取,这些服务器都配置在一个 zookeeper 下。我如何从这些多台服务器读取。你能帮忙解决一下吗?
我可以指向 zookeeper 而不是 Kafka 服务器吗?
@KafkaListener
需要 KafkaListenerContainerFactory
@Bean
,而后者又基于 ConsumerFactory
。 DefaultKafkaConsumerFactory
接受 Map<String, Object>
消费者配置:
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ...);
...
return props;
}
}
其中 ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
正是标准的 Apache Kafka bootstrap.servers
property:
A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form host1:port1,host2:port2,.... Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down).
不行,你不能指向Zookeeper地址。 Kafka 不再支持它。
映射具有键:字符串和值:对象。
key:ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG 的对象应该是由 ',' 分隔的一系列主机端口对,例如:host1:port1,host2:port2,host3:port3 ... ...
例如监听三个服务器:localhost:9092,192.168.22.12:9088,localhost:7898