断开一个代理时测试 kafka 行为 (spring-kafka)
Test kafka behaviour when disconnecting one broker (spring-kafka)
我从卡夫卡开始。
我有一个集群,有 2 个代理(id #2 和 #3),复制因子为 2。
如果我断开一个代理(id #3),我想测试 Kafka 的行为。
#3关闭后,我的主题信息:
Topic: CUSTOMER PartitionCount: 5 ReplicationFactor: 2 Configs:
Topic: CUSTOMER Partition: 0 Leader: 2 Replicas: 3,2 Isr: 2
Topic: CUSTOMER Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2
Topic: CUSTOMER Partition: 2 Leader: 2 Replicas: 3,2 Isr: 2
Topic: CUSTOMER Partition: 3 Leader: 2 Replicas: 2,3 Isr: 2
Topic: CUSTOMER Partition: 4 Leader: 2 Replicas: 3,2 Isr: 2
每个分区都复制到每个代理上,现在 #2 代理是领导者,没关系。
消息的发布是可以的,但它没有被我的消费者服务消费(我使用的是Spring-kafka)。
断线时刻,消费者日志为:
2020-04-01 14:51:42.736 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-4-C-1] o.a.k.c.c.internals.AbstractCoordinator:677 [][][] : [Consumer clientId=consumer-6, groupId=NOTIF] Discovered group coordinator 10.0.0.0:9092 (id: 2147483644 rack: null)
2020-04-01 14:51:42.737 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-4-C-1] o.a.k.c.c.internals.AbstractCoordinator:729 [][][] : [Consumer clientId=consumer-6, groupId=NOTIF] Group coordinator 10.0.0.0:9092 (id: 2147483644 rack: null) is unavailable or invalid, will attempt rediscovery
2020-04-01 14:51:42.840 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-4-C-1] o.a.k.c.c.internals.AbstractCoordinator:677 [][][] : [Consumer clientId=consumer-6, groupId=NOTIF] Discovered group coordinator 10.0.0.0:9092 (id: 2147483644 rack: null)
2020-04-01 14:51:42.841 WARN [org.springframework.kafka.KafkaListenerEndpointContainer#0-4-C-1] org.apache.kafka.clients.NetworkClient:671 [][][] : [Consumer clientId=consumer-6, groupId=NOTIF] Connection to node 2147483644 could not be established. Broker may not be available.
2020-04-01 14:51:42.841 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-4-C-1] o.a.k.c.c.internals.AbstractCoordinator:729 [][][] : [Consumer clientId=consumer-6, groupId=NOTIF] Group coordinator 10.0.0.0:9092 (id: 2147483644 rack: null) is unavailable or invalid, will attempt rediscovery
2020-04-01 14:51:42.842 WARN [org.springframework.kafka.KafkaListenerEndpointContainer#0-4-C-1] org.apache.kafka.clients.NetworkClient:671 [][][] : [Consumer clientId=consumer-6, groupId=NOTIF] Connection to node 3 could not be established. Broker may not be available.
2020-04-01 14:51:43.136 WARN [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] org.apache.kafka.clients.NetworkClient:671 [][][] : [Consumer clientId=consumer-5, groupId=NOTIF] Connection to node 3 could not be established. Broker may not be available.
2020-04-01 14:51:43.184 WARN [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] org.apache.kafka.clients.NetworkClient:671 [][][] : [Consumer clientId=consumer-3, groupId=NOTIF] Connection to node 3 could not be established. Broker may not be available.
然后什么都没有。
动物园管理员日志中没有任何内容。
当我启动我的代理时,现在所有的消息都被消耗了。
如果我错了你能告诉我吗?
使用我的主题配置,我假设一个代理的断开连接应该是可能的而不会产生影响。
我的kafka配置:
broker.id=2 (not the same value on the other broker)
delete.topic.enable=true
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/usr/share/kafka/logs
num.partitions=2
num.recovery.threads.per.data.dir=1
log.retention.hours=48
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=serv2:2181,serv3:2181,serv5:2181
zookeeper.connection.timeout.ms=6000
default.replication.factor=1
offsets.topic.replication.factor=1
还有我的动物园管理员配置:
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/share/zookeeper/data
server.2=serv2:2888:3888;2181
server.3=serv3:2888:3888;2181
server.5=serv5:2888:3888;2181
我用 Spring Kafka 创建主题:
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}
@Bean
public NewTopic notifTopic() {
return new NewTopic(notifTopic, partitions, (short) bootstrapAddress.split(",").length);
}
对于消费者:
配置:
@EnableKafka
@Configuration
@Profile({ "!mockKafka & !test" })
public class KafkaConfiguration implements KafkaListenerConfigurer {
@Autowired
private LocalValidatorFactoryBean validator;
@Value(value = "${kafka.servers}")
private String bootstrapAddress;
@Value(value = "${kafka.groups.notif.name}")
private String notifGroup;
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> containerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<String, Object>(getConsumerFactoryProperties()));
factory.setConcurrency(5);
return factory;
}
private Map<String, Object> getConsumerFactoryProperties() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, notifGroup);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class.getName());
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
return props;
}
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
registrar.setValidator(validator);
}
}
听众:
@Service
@Slf4j
@Profile({"!mockKafka & !test"})
@Transactional
@KafkaListener(containerFactory = "containerFactory", topics = { "${kafka.topics.notif.name}" })
public class NotificationListener {
@KafkaHandler
public void email(@Payload @Valid EmailNotification record, @Header(ContextUtils.HEADER_ACCOUNT) String account,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.OFFSET) long offset,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) long partition) {
log.info("Consuming message [EMAIL] from topic [{}], partition [{}], offset [{}]", topic, partition, offset);
...
}
和我的全局配置:
kafka:
servers: serv2:9092,serv3:9092
publish.timeout: 3000
partitions: 5
topics:
customer:
name: CUSTOMER
notif:
name: NOTIF
health:
name: HEALTH
groups:
customer:
name: CUSTOMER
notif:
name: NOTIF
使用的版本:
卡夫卡:2.4.0
动物园管理员:3.5.6
Spring-卡夫卡:2.2.12
谢谢。
我认为是
offsets.topic.replication.factor=1
由于不复制偏移量,消费者无法找到其位置。
根据文档,默认值为 3(如果省略);但它被覆盖为 1(至少在我的自制程序中)。
我从卡夫卡开始。
我有一个集群,有 2 个代理(id #2 和 #3),复制因子为 2。 如果我断开一个代理(id #3),我想测试 Kafka 的行为。
#3关闭后,我的主题信息:
Topic: CUSTOMER PartitionCount: 5 ReplicationFactor: 2 Configs:
Topic: CUSTOMER Partition: 0 Leader: 2 Replicas: 3,2 Isr: 2
Topic: CUSTOMER Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2
Topic: CUSTOMER Partition: 2 Leader: 2 Replicas: 3,2 Isr: 2
Topic: CUSTOMER Partition: 3 Leader: 2 Replicas: 2,3 Isr: 2
Topic: CUSTOMER Partition: 4 Leader: 2 Replicas: 3,2 Isr: 2
每个分区都复制到每个代理上,现在 #2 代理是领导者,没关系。
消息的发布是可以的,但它没有被我的消费者服务消费(我使用的是Spring-kafka)。
断线时刻,消费者日志为:
2020-04-01 14:51:42.736 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-4-C-1] o.a.k.c.c.internals.AbstractCoordinator:677 [][][] : [Consumer clientId=consumer-6, groupId=NOTIF] Discovered group coordinator 10.0.0.0:9092 (id: 2147483644 rack: null)
2020-04-01 14:51:42.737 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-4-C-1] o.a.k.c.c.internals.AbstractCoordinator:729 [][][] : [Consumer clientId=consumer-6, groupId=NOTIF] Group coordinator 10.0.0.0:9092 (id: 2147483644 rack: null) is unavailable or invalid, will attempt rediscovery
2020-04-01 14:51:42.840 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-4-C-1] o.a.k.c.c.internals.AbstractCoordinator:677 [][][] : [Consumer clientId=consumer-6, groupId=NOTIF] Discovered group coordinator 10.0.0.0:9092 (id: 2147483644 rack: null)
2020-04-01 14:51:42.841 WARN [org.springframework.kafka.KafkaListenerEndpointContainer#0-4-C-1] org.apache.kafka.clients.NetworkClient:671 [][][] : [Consumer clientId=consumer-6, groupId=NOTIF] Connection to node 2147483644 could not be established. Broker may not be available.
2020-04-01 14:51:42.841 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-4-C-1] o.a.k.c.c.internals.AbstractCoordinator:729 [][][] : [Consumer clientId=consumer-6, groupId=NOTIF] Group coordinator 10.0.0.0:9092 (id: 2147483644 rack: null) is unavailable or invalid, will attempt rediscovery
2020-04-01 14:51:42.842 WARN [org.springframework.kafka.KafkaListenerEndpointContainer#0-4-C-1] org.apache.kafka.clients.NetworkClient:671 [][][] : [Consumer clientId=consumer-6, groupId=NOTIF] Connection to node 3 could not be established. Broker may not be available.
2020-04-01 14:51:43.136 WARN [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] org.apache.kafka.clients.NetworkClient:671 [][][] : [Consumer clientId=consumer-5, groupId=NOTIF] Connection to node 3 could not be established. Broker may not be available.
2020-04-01 14:51:43.184 WARN [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] org.apache.kafka.clients.NetworkClient:671 [][][] : [Consumer clientId=consumer-3, groupId=NOTIF] Connection to node 3 could not be established. Broker may not be available.
然后什么都没有。 动物园管理员日志中没有任何内容。
当我启动我的代理时,现在所有的消息都被消耗了。
如果我错了你能告诉我吗? 使用我的主题配置,我假设一个代理的断开连接应该是可能的而不会产生影响。
我的kafka配置:
broker.id=2 (not the same value on the other broker)
delete.topic.enable=true
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/usr/share/kafka/logs
num.partitions=2
num.recovery.threads.per.data.dir=1
log.retention.hours=48
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=serv2:2181,serv3:2181,serv5:2181
zookeeper.connection.timeout.ms=6000
default.replication.factor=1
offsets.topic.replication.factor=1
还有我的动物园管理员配置:
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/share/zookeeper/data
server.2=serv2:2888:3888;2181
server.3=serv3:2888:3888;2181
server.5=serv5:2888:3888;2181
我用 Spring Kafka 创建主题:
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}
@Bean
public NewTopic notifTopic() {
return new NewTopic(notifTopic, partitions, (short) bootstrapAddress.split(",").length);
}
对于消费者: 配置:
@EnableKafka
@Configuration
@Profile({ "!mockKafka & !test" })
public class KafkaConfiguration implements KafkaListenerConfigurer {
@Autowired
private LocalValidatorFactoryBean validator;
@Value(value = "${kafka.servers}")
private String bootstrapAddress;
@Value(value = "${kafka.groups.notif.name}")
private String notifGroup;
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> containerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<String, Object>(getConsumerFactoryProperties()));
factory.setConcurrency(5);
return factory;
}
private Map<String, Object> getConsumerFactoryProperties() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, notifGroup);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class.getName());
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
return props;
}
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
registrar.setValidator(validator);
}
}
听众:
@Service
@Slf4j
@Profile({"!mockKafka & !test"})
@Transactional
@KafkaListener(containerFactory = "containerFactory", topics = { "${kafka.topics.notif.name}" })
public class NotificationListener {
@KafkaHandler
public void email(@Payload @Valid EmailNotification record, @Header(ContextUtils.HEADER_ACCOUNT) String account,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.OFFSET) long offset,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) long partition) {
log.info("Consuming message [EMAIL] from topic [{}], partition [{}], offset [{}]", topic, partition, offset);
...
}
和我的全局配置:
kafka:
servers: serv2:9092,serv3:9092
publish.timeout: 3000
partitions: 5
topics:
customer:
name: CUSTOMER
notif:
name: NOTIF
health:
name: HEALTH
groups:
customer:
name: CUSTOMER
notif:
name: NOTIF
使用的版本: 卡夫卡:2.4.0 动物园管理员:3.5.6 Spring-卡夫卡:2.2.12
谢谢。
我认为是
offsets.topic.replication.factor=1
由于不复制偏移量,消费者无法找到其位置。
根据文档,默认值为 3(如果省略);但它被覆盖为 1(至少在我的自制程序中)。