Spring Boot Kafka Consumers 不按照消息产生的顺序消费
Spring Boot Kafka Consumers does not consume in the order the messages are produced
我制作了两个Spring Boot 项目。一个是 kafka 制作人,另一个是听众。
然后我有一个这样的 dockerfile:
我为 Zookeeper 和 Kafka 创建了一个容器,还有一个生产者容器和两个消费者容器。
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
restart: always
ports:
- 2181:2181
kafka:
image: wurstmeister/kafka
container_name: kafka
restart: always
ports:
- 9092:9092
depends_on:
- zookeeper
links:
- zookeeper:zookeeper
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
consumer1:
image: consumer:0.0.1-SNAPSHOT
container_name: consumer1
depends_on:
- kafka
restart: always
ports:
- 8081:8081
environment:
SERVER_PORT: 8081
depends_on:
- kafka
links:
- kafka:kafka
consumer2:
image: consumer:0.0.1-SNAPSHOT
container_name: consumer2
depends_on:
- kafka
restart: always
ports:
- 8082:8082
environment:
SERVER_PORT: 8082
depends_on:
- kafka
links:
- kafka:kafka
producer:
image: producer:0.0.1-SNAPSHOT
container_name: producer
depends_on:
- kafka
restart: always
ports:
- 8080:8080
environment:
SERVER_PORT: 8080
depends_on:
- kafka
links:
- kafka:kafka
现在来解决我的问题。我希望我的消费者从我已经完成的同一主题中消费。
但是 - 似乎他们没有按照生产者生成消息的顺序使用消息。
如下图所示,“数字:4”先于“数字:3”被消耗,例如:
producer | i: 0
consumer2 | Number: 0
producer | i: 1
consumer2 | Number: 1
producer | i: 2
consumer1 | Number: 2
producer | i: 3
producer | i: 4
consumer2 | Number: 4
producer | i: 5
consumer1 | Number: 6
producer | i: 6
consumer2 | Number: 3
producer | i: 7
producer | i: 8
producer | i: 9
consumer2 | Number: 5
producer | i: 10
consumer1 | Number: 10
producer | i: 11
我的 KafkaProducer class:
@Service
public class KafkaProducer {
@Value("${topic.name.producer}")
private String topicName;
@Autowired
private KafkaTemplate<String, String> kafkaStringTemplate;
public void sendList(String word) {
kafkaStringTemplate.send(topicName, word);
}
}
我有一个 for 循环为这个循环提供
for (int i = 0; i < 100; i++) {
producer.sendList("Number: " + i);
}
在我的生产者项目中,我有一个 TopicConfiguration:
@Configuration
public class TopicConfiguration
{
@Value(value = "${spring.kafka.producer.bootstrap-servers}")
private String bootstrapAddress;
@Value(value = "${topic.name.producer}")
private String topicName;
@Bean
public NewTopic generalTopic() {
return TopicBuilder.name(topicName)
.partitions(3)
.replicas(1)
.build();
}
@Bean
public KafkaAdmin kafkaAdmin()
{
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}
}
Application.Properties 文件:
server.port=${SERVER_PORT}
# Producer properties
spring.kafka.producer.bootstrap-servers=kafka:9092
#spring.kafka.producer.bootstrap-servers=172.21.0.2:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.group-id=group-1
topic.name.producer=test
# Common Kafka Properties
auto.create.topics.enable=true
我的消费者项目:
@Service
public class KafkaConsumer {
@Value("${topic.name.consumer")
private String topicName;
@KafkaListener(topics = "${topic.name.consumer}", groupId = "group-1")
public void consumeLinks(String word) throws InterruptedException {
System.out.println(word);
Thread.sleep(5000);
}
}
应用程序属性文件:
#server.port=8081
server.port=${SERVER_PORT}
# Producer properties
spring.kafka.consumer.bootstrap-servers=kafka:9092
#spring.kafka.consumer.bootstrap-servers=172.21.0.2:9092
#spring.kafka.consumer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
#spring.kafka.consumer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.max-poll-records=1
spring.kafka.consumer.group-id=group-1
topic.name.consumer=test
spring.kafka.consumer.auto-offset-reset=earliest
# Common Kafka Properties
auto.create.topics.enable=true
我已经尝试 google 但没有找到任何解决方案,或者我没有问题不明白如何解决。是否有人可以告诉我缺少什么或有一个 link 页面供傻瓜如何解决?
您的主题有 3 个分区。除非您只使用一个分区,否则将无法保证顺序。更具体地说,数据仅在分区内排序;每个消费者正在消费在其已分配分区.
中订购的数据
要显示此内容,请尝试
@KafkaListener(topics = "${topic.name.consumer}", groupId = "group-1")
public void consumeLinks(
@Payload String word,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
System.out.println(
"Received Message: " + word
+ " from partition: " + partition);
如果你使用一个分区,那意味着你在那个消费者组中只能有一个消费者。
我制作了两个Spring Boot 项目。一个是 kafka 制作人,另一个是听众。 然后我有一个这样的 dockerfile: 我为 Zookeeper 和 Kafka 创建了一个容器,还有一个生产者容器和两个消费者容器。
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
restart: always
ports:
- 2181:2181
kafka:
image: wurstmeister/kafka
container_name: kafka
restart: always
ports:
- 9092:9092
depends_on:
- zookeeper
links:
- zookeeper:zookeeper
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
consumer1:
image: consumer:0.0.1-SNAPSHOT
container_name: consumer1
depends_on:
- kafka
restart: always
ports:
- 8081:8081
environment:
SERVER_PORT: 8081
depends_on:
- kafka
links:
- kafka:kafka
consumer2:
image: consumer:0.0.1-SNAPSHOT
container_name: consumer2
depends_on:
- kafka
restart: always
ports:
- 8082:8082
environment:
SERVER_PORT: 8082
depends_on:
- kafka
links:
- kafka:kafka
producer:
image: producer:0.0.1-SNAPSHOT
container_name: producer
depends_on:
- kafka
restart: always
ports:
- 8080:8080
environment:
SERVER_PORT: 8080
depends_on:
- kafka
links:
- kafka:kafka
现在来解决我的问题。我希望我的消费者从我已经完成的同一主题中消费。 但是 - 似乎他们没有按照生产者生成消息的顺序使用消息。
如下图所示,“数字:4”先于“数字:3”被消耗,例如:
producer | i: 0
consumer2 | Number: 0
producer | i: 1
consumer2 | Number: 1
producer | i: 2
consumer1 | Number: 2
producer | i: 3
producer | i: 4
consumer2 | Number: 4
producer | i: 5
consumer1 | Number: 6
producer | i: 6
consumer2 | Number: 3
producer | i: 7
producer | i: 8
producer | i: 9
consumer2 | Number: 5
producer | i: 10
consumer1 | Number: 10
producer | i: 11
我的 KafkaProducer class:
@Service
public class KafkaProducer {
@Value("${topic.name.producer}")
private String topicName;
@Autowired
private KafkaTemplate<String, String> kafkaStringTemplate;
public void sendList(String word) {
kafkaStringTemplate.send(topicName, word);
}
}
我有一个 for 循环为这个循环提供
for (int i = 0; i < 100; i++) {
producer.sendList("Number: " + i);
}
在我的生产者项目中,我有一个 TopicConfiguration:
@Configuration
public class TopicConfiguration
{
@Value(value = "${spring.kafka.producer.bootstrap-servers}")
private String bootstrapAddress;
@Value(value = "${topic.name.producer}")
private String topicName;
@Bean
public NewTopic generalTopic() {
return TopicBuilder.name(topicName)
.partitions(3)
.replicas(1)
.build();
}
@Bean
public KafkaAdmin kafkaAdmin()
{
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}
}
Application.Properties 文件:
server.port=${SERVER_PORT}
# Producer properties
spring.kafka.producer.bootstrap-servers=kafka:9092
#spring.kafka.producer.bootstrap-servers=172.21.0.2:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.group-id=group-1
topic.name.producer=test
# Common Kafka Properties
auto.create.topics.enable=true
我的消费者项目:
@Service
public class KafkaConsumer {
@Value("${topic.name.consumer")
private String topicName;
@KafkaListener(topics = "${topic.name.consumer}", groupId = "group-1")
public void consumeLinks(String word) throws InterruptedException {
System.out.println(word);
Thread.sleep(5000);
}
}
应用程序属性文件:
#server.port=8081
server.port=${SERVER_PORT}
# Producer properties
spring.kafka.consumer.bootstrap-servers=kafka:9092
#spring.kafka.consumer.bootstrap-servers=172.21.0.2:9092
#spring.kafka.consumer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
#spring.kafka.consumer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.max-poll-records=1
spring.kafka.consumer.group-id=group-1
topic.name.consumer=test
spring.kafka.consumer.auto-offset-reset=earliest
# Common Kafka Properties
auto.create.topics.enable=true
我已经尝试 google 但没有找到任何解决方案,或者我没有问题不明白如何解决。是否有人可以告诉我缺少什么或有一个 link 页面供傻瓜如何解决?
您的主题有 3 个分区。除非您只使用一个分区,否则将无法保证顺序。更具体地说,数据仅在分区内排序;每个消费者正在消费在其已分配分区.
中订购的数据要显示此内容,请尝试
@KafkaListener(topics = "${topic.name.consumer}", groupId = "group-1")
public void consumeLinks(
@Payload String word,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
System.out.println(
"Received Message: " + word
+ " from partition: " + partition);
如果你使用一个分区,那意味着你在那个消费者组中只能有一个消费者。