消费者应用程序在从多个分区读取记录时导致重复
Consumer application causing duplicate while reading records from multiple partition
我正在尝试使用 spring-kafka 从流中获取记录。记录在 avro 模式中。由于该主题有 2 个分区,我使用 kafka 并发作为 2 并行使用分区中的记录。但是,它似乎引起了一些问题。
我在处理之前记录从分区收到的记录,以确保我们没有重复(不同分区中的相同键)。
配置:
@Bean
public ConsumerFactory<String, GenericRecord> consumerFactory(){
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_BROKERS);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OFFSET_RESET);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID_CONFIG);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, MAX_POLL_INTERVAL);
props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SSL_PROTOCOL);
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,SSL_TRUSTSTORE_LOCATION_FILE_NAME);
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, SSL_TRUSTSTORE_SECURE);
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,SSL_KEYSTORE_LOCATION_FILE_NAME);
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, SSL_KEYSTORE_SECURE);
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, SSL_KEY_SECURE);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
ConcurrentKafkaListenerContainerFactory<String, GenericRecord>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(KAFKA_CONCURRENCY);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); // manual async committ
//factory.getContainerProperties().setCommitLogLevel(LogIfLevelEnabled.Level.INFO)
return factory;
}
代码:
@KafkaListener(topics = "${app.topic}", groupId = "${app.group_id_config}")
public void run(ConsumerRecord<String, GenericRecord> record, Acknowledgment acknowledgement) throws Exception {
try {
//System.out.println(record);
if (record.value().get("enrollmentType").toString().matches("ACH|VCP|CHK"))
{
prov_tin_number = record.value().get("providerTinNumber").toString();
//prov_tin_type = record.value().get("providerTINType").toString();
enroll_type = record.value().get("enrollmentType").toString();
vcp_prov_choice_ind = record.value().get("vcpProvChoiceInd").toString();
error_flag = "";
}
System.out.println("coming from stream :" + prov_tin_number + " into offset " + record.offset() + " and partition " + record.partition());
acknowledgement.acknowledge();
}catch (Exception ex) {
System.out.println(record);
System.out.println(ex.getMessage());
}
}
代码输出:
示例:
coming from stream :018601027 into offset 500428 and partition 0
coming from stream :018601027 into offset 499553 and partition 1
从上面的输出看来,同一条记录进入不同的偏移量和分区,导致消费者端重复。但是,情况并非如此,当我尝试使用命令行读取记录时,我得到以下输出:
root@fast-data-dev bin $ kafka-avro-console-consumer --topic kaas.pe.enrollment.csp.ts2 --bootstrap-server kaas-test-ctc-a.optum.com:443 --consumer.config
/data/test/client-test-ssl.properties **--partition 1 --offset 499553** --property schema.registry.url="http://kaas-test-schema-registry.com" --max-messages 1
{"**providerTinNumber":"018601027"**,"providerTINType":"TIN","enrollmentType":"ACH","vcpProvChoiceInd":{"string":"null"},"usrDefFld1":null,"usrDefFld2":null,"usrDefFld3":null,"usrDefFld4":null,"usrDefFld5":null,"usrDefFld6":null,"usrDefFld7":null,"usrDefFld8":null,"usrDefFld9":null,"usrDefFld10":null}
Processed a total of 1 messages
root@fast-data-dev bin $ kafka-avro-console-consumer --topic kaas.pe.enrollment.csp.ts2 --bootstrap-server kaas-test-ctc-a.optum.com:443 --consumer.config
/data/test/client-test-ssl.properties **--partition 0 --offset 500428** --property schema.registry.url="http://kaas-test-schema-registry.com" --max-messages 1
{"**providerTinNumber":"024580061"**,"providerTINType":"TIN","enrollmentType":"ACH","vcpProvChoiceInd":{"string":"null"},"usrDefFld1":null,"usrDefFld2":null,"usrDefFld3":null,"usrDefFld4":null,"usrDefFld5":null,"usrDefFld6":null,"usrDefFld7":null,"usrDefFld8":null,"usrDefFld9":null,"usrDefFld10":null}
Processed a total of 1 messages
我们在不同的偏移量和分区上确实有不同的值。很明显,我的代码中有问题,而且不是只有 1 条记录,而是多条记录。
完整 Spring 引导日志:
00:26:11.507 [restartedMain] INFO com.emerald.peconsumer.ApplicationRun - Started ApplicationRun in 2.896 seconds (JVM running for 12.571)
00:26:13.357 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] Cluster ID: 6cbv7QOaSW6j1vXrOCE4jA
00:26:13.357 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] Cluster ID: 6cbv7QOaSW6j1vXrOCE4jA
00:26:13.359 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] Discovered group coordinator apslp1563.uhc.com:9093 (id: 2147483574 rack: null)
00:26:13.359 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] Discovered group coordinator apslp1563.uhc.com:9093 (id: 2147483574 rack: null)
00:26:13.521 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] (Re-)joining group
00:26:13.521 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] (Re-)joining group
00:26:15.196 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] (Re-)joining group
00:26:15.197 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] (Re-)joining group
00:26:30.504 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] Finished assignment for group at generation 77: {consumer-csp-prov-emerald-test-2-d2f920dc-a52a-4ed4-aa0f-1e3ef268a4fc=org.apache.kafka.clients.consumer.ConsumerPartitionAssignor$Assignment@1f9e0b89, consumer-csp-prov-emerald-test-1-242f32f2-b823-4946-be1f-a6c584a0f3ce=org.apache.kafka.clients.consumer.ConsumerPartitionAssignor$Assignment@91e5bc9}
00:26:30.815 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] Successfully joined group with generation 77
00:26:30.815 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] Successfully joined group with generation 77
00:26:30.818 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] Adding newly assigned partitions: kaas.pe.enrollment.csp.ts2-0
00:26:30.818 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] Adding newly assigned partitions: kaas.pe.enrollment.csp.ts2-1
00:26:31.133 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] Setting offset for partition kaas.pe.enrollment.csp.ts2-0 to the committed offset FetchPosition{offset=500428, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=apslp1559.uhc.com:9093 (id: 69 rack: null), epoch=37}}
00:26:31.133 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] Setting offset for partition kaas.pe.enrollment.csp.ts2-1 to the committed offset FetchPosition{offset=499553, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=apslp1562.uhc.com:9093 (id: 72 rack: null), epoch=36}}
00:26:31.134 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO org.springframework.kafka.listener.KafkaMessageListenerContainer - csp-prov-emerald-test: partitions assigned: [kaas.pe.enrollment.csp.ts2-0]
00:26:31.134 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO org.springframework.kafka.listener.KafkaMessageListenerContainer - csp-prov-emerald-test: partitions assigned: [kaas.pe.enrollment.csp.ts2-1]
coming from stream :018601027 into offset 500428 and partition 0
coming from stream :018601027 into offset 499553 and partition 1
更新:
我尝试用 1 个消费者线程和 2 个消费者线程再次打印记录。
来自 2 个消费者线程的输出:
你可以观察到行为的随机性。
重复记录:同一条记录进入 2 个不同的分区
先运行 :
coming from stream :018601027 into offset 500428 and partition 0 <-- duplicate
coming from stream :018601027 into offset 499553 and partition 1 <-- duplicate
coming from stream :072461600 into offset 500429 and partition 0
coming from stream :027400400 into offset 499554 and partition 1
coming from stream :090341206 into offset 500430 and partition 0 <-- duplicate
coming from stream :090341206 into offset 499555 and partition 1 <-- duplicate
coming from stream :113423162 into offset 500431 and partition 0
coming from stream :052407795 into offset 499556 and partition 1
coming from stream :086586131 into offset 499557 and partition 1
coming from stream :113424057 into offset 500432 and partition 0
coming from stream :090443768 into offset 499558 and partition 1
coming from stream :024580061 into offset 500433 and partition 0
coming from stream :072461600 into offset 500434 and partition 0
coming from stream :090465976 into offset 499559 and partition 1
coming from stream :094324212 into offset 499560 and partition 1
coming from stream :090341206 into offset 500435 and partition 0
coming from stream :107422748 into offset 499561 and partition 1
coming from stream :113423162 into offset 500436 and partition 0
coming from stream :004582777 into offset 499562 and partition 1
coming from stream :113424057 into offset 500437 and partition 0
coming from stream :018601027 into offset 499563 and partition 1
coming from stream :027400400 into offset 499564 and partition 1
coming from stream :031866294 into offset 499565 and partition 1
coming from stream :052407795 into offset 499566 and partition 1
coming from stream :086586131 into offset 499567 and partition 1
coming from stream :090443768 into offset 499568 and partition 1
coming from stream :090465976 into offset 499569 and partition 1
coming from stream :094324212 into offset 499570 and partition 1
coming from stream :107422748 into offset 499571 and partition 1
coming from stream :272626998 into offset 499572 and partition 1
第二个运行:
coming from stream :024580061 into offset 499553 and partition 1 <-- duplicate
coming from stream :024580061 into offset 500428 and partition 0 <-- duplicate
coming from stream :027400400 into offset 499554 and partition 1
coming from stream :072461600 into offset 500429 and partition 0
coming from stream :090341206 into offset 500430 and partition 0
coming from stream :031866294 into offset 499555 and partition 1
coming from stream :113423162 into offset 500431 and partition 0
coming from stream :052407795 into offset 499556 and partition 1
coming from stream :113424057 into offset 500432 and partition 0
coming from stream :086586131 into offset 499557 and partition 1
coming from stream :090443768 into offset 499558 and partition 1
coming from stream :024580061 into offset 500433 and partition 0
coming from stream :072461600 into offset 500434 and partition 0
coming from stream :090465976 into offset 499559 and partition 1
coming from stream :090341206 into offset 500435 and partition 0
coming from stream :094324212 into offset 499560 and partition 1
coming from stream :113423162 into offset 500436 and partition 0 <-- duplicate
coming from stream :113423162 into offset 499561 and partition 1 <-- duplicate
coming from stream :004582777 into offset 499562 and partition 1
coming from stream :113424057 into offset 500437 and partition 0
coming from stream :018601027 into offset 499563 and partition 1
coming from stream :027400400 into offset 499564 and partition 1
coming from stream :031866294 into offset 499565 and partition 1
coming from stream :052407795 into offset 499566 and partition 1
coming from stream :086586131 into offset 499567 and partition 1
coming from stream :090443768 into offset 499568 and partition 1
coming from stream :090465976 into offset 499569 and partition 1
coming from stream :094324212 into offset 499570 and partition 1
coming from stream :107422748 into offset 499571 and partition 1
coming from stream :272626998 into offset 499572 and partition 1
来自 1 个消费者线程的输出:
如果使用 1 个消费者线程,则不会重复。记录正在按预期打印。这是否意味着 spring-kafka 并发参数不可靠?那么如何扩展消费者应用程序以并行处理记录?
coming from stream :018601027 into offset 499553 and partition 1
coming from stream :027400400 into offset 499554 and partition 1
coming from stream :031866294 into offset 499555 and partition 1
coming from stream :052407795 into offset 499556 and partition 1
coming from stream :086586131 into offset 499557 and partition 1
coming from stream :090443768 into offset 499558 and partition 1
coming from stream :090465976 into offset 499559 and partition 1
coming from stream :094324212 into offset 499560 and partition 1
coming from stream :107422748 into offset 499561 and partition 1
coming from stream :004582777 into offset 499562 and partition 1
coming from stream :018601027 into offset 499563 and partition 1
coming from stream :027400400 into offset 499564 and partition 1
coming from stream :031866294 into offset 499565 and partition 1
coming from stream :052407795 into offset 499566 and partition 1
coming from stream :086586131 into offset 499567 and partition 1
coming from stream :090443768 into offset 499568 and partition 1
coming from stream :090465976 into offset 499569 and partition 1
coming from stream :094324212 into offset 499570 and partition 1
coming from stream :107422748 into offset 499571 and partition 1
coming from stream :272626998 into offset 499572 and partition 1
coming from stream :024580061 into offset 500428 and partition 0
coming from stream :072461600 into offset 500429 and partition 0
coming from stream :090341206 into offset 500430 and partition 0
coming from stream :113423162 into offset 500431 and partition 0
coming from stream :113424057 into offset 500432 and partition 0
coming from stream :024580061 into offset 500433 and partition 0
coming from stream :072461600 into offset 500434 and partition 0
coming from stream :090341206 into offset 500435 and partition 0
coming from stream :113423162 into offset 500436 and partition 0
coming from stream :113424057 into offset 500437 and partition 0
您从侦听器调用的代码很可能存在 thread-safetly 问题;当使用多线程时,你不能使用字段,除非你用同步逻辑保护它们。
例如
public class NotThreadSafe {
String someValue;
void processRecord(ConsumerRecord<?, ?> record) {
this.someValue = record.value();
doSomeMoreWork();
}
void doSomeMoreWork() {
...
doSomethingWith(this.someValue);
}
}
当有多个线程时,一个线程可能会从另一个线程看到 someValue
。
我正在尝试使用 spring-kafka 从流中获取记录。记录在 avro 模式中。由于该主题有 2 个分区,我使用 kafka 并发作为 2 并行使用分区中的记录。但是,它似乎引起了一些问题。
我在处理之前记录从分区收到的记录,以确保我们没有重复(不同分区中的相同键)。
配置:
@Bean
public ConsumerFactory<String, GenericRecord> consumerFactory(){
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_BROKERS);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OFFSET_RESET);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID_CONFIG);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, MAX_POLL_INTERVAL);
props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SSL_PROTOCOL);
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,SSL_TRUSTSTORE_LOCATION_FILE_NAME);
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, SSL_TRUSTSTORE_SECURE);
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,SSL_KEYSTORE_LOCATION_FILE_NAME);
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, SSL_KEYSTORE_SECURE);
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, SSL_KEY_SECURE);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
ConcurrentKafkaListenerContainerFactory<String, GenericRecord>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(KAFKA_CONCURRENCY);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); // manual async committ
//factory.getContainerProperties().setCommitLogLevel(LogIfLevelEnabled.Level.INFO)
return factory;
}
代码:
@KafkaListener(topics = "${app.topic}", groupId = "${app.group_id_config}")
public void run(ConsumerRecord<String, GenericRecord> record, Acknowledgment acknowledgement) throws Exception {
try {
//System.out.println(record);
if (record.value().get("enrollmentType").toString().matches("ACH|VCP|CHK"))
{
prov_tin_number = record.value().get("providerTinNumber").toString();
//prov_tin_type = record.value().get("providerTINType").toString();
enroll_type = record.value().get("enrollmentType").toString();
vcp_prov_choice_ind = record.value().get("vcpProvChoiceInd").toString();
error_flag = "";
}
System.out.println("coming from stream :" + prov_tin_number + " into offset " + record.offset() + " and partition " + record.partition());
acknowledgement.acknowledge();
}catch (Exception ex) {
System.out.println(record);
System.out.println(ex.getMessage());
}
}
代码输出:
示例:
coming from stream :018601027 into offset 500428 and partition 0
coming from stream :018601027 into offset 499553 and partition 1
从上面的输出看来,同一条记录进入不同的偏移量和分区,导致消费者端重复。但是,情况并非如此,当我尝试使用命令行读取记录时,我得到以下输出:
root@fast-data-dev bin $ kafka-avro-console-consumer --topic kaas.pe.enrollment.csp.ts2 --bootstrap-server kaas-test-ctc-a.optum.com:443 --consumer.config
/data/test/client-test-ssl.properties **--partition 1 --offset 499553** --property schema.registry.url="http://kaas-test-schema-registry.com" --max-messages 1
{"**providerTinNumber":"018601027"**,"providerTINType":"TIN","enrollmentType":"ACH","vcpProvChoiceInd":{"string":"null"},"usrDefFld1":null,"usrDefFld2":null,"usrDefFld3":null,"usrDefFld4":null,"usrDefFld5":null,"usrDefFld6":null,"usrDefFld7":null,"usrDefFld8":null,"usrDefFld9":null,"usrDefFld10":null}
Processed a total of 1 messages
root@fast-data-dev bin $ kafka-avro-console-consumer --topic kaas.pe.enrollment.csp.ts2 --bootstrap-server kaas-test-ctc-a.optum.com:443 --consumer.config
/data/test/client-test-ssl.properties **--partition 0 --offset 500428** --property schema.registry.url="http://kaas-test-schema-registry.com" --max-messages 1
{"**providerTinNumber":"024580061"**,"providerTINType":"TIN","enrollmentType":"ACH","vcpProvChoiceInd":{"string":"null"},"usrDefFld1":null,"usrDefFld2":null,"usrDefFld3":null,"usrDefFld4":null,"usrDefFld5":null,"usrDefFld6":null,"usrDefFld7":null,"usrDefFld8":null,"usrDefFld9":null,"usrDefFld10":null}
Processed a total of 1 messages
我们在不同的偏移量和分区上确实有不同的值。很明显,我的代码中有问题,而且不是只有 1 条记录,而是多条记录。
完整 Spring 引导日志:
00:26:11.507 [restartedMain] INFO com.emerald.peconsumer.ApplicationRun - Started ApplicationRun in 2.896 seconds (JVM running for 12.571)
00:26:13.357 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] Cluster ID: 6cbv7QOaSW6j1vXrOCE4jA
00:26:13.357 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] Cluster ID: 6cbv7QOaSW6j1vXrOCE4jA
00:26:13.359 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] Discovered group coordinator apslp1563.uhc.com:9093 (id: 2147483574 rack: null)
00:26:13.359 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] Discovered group coordinator apslp1563.uhc.com:9093 (id: 2147483574 rack: null)
00:26:13.521 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] (Re-)joining group
00:26:13.521 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] (Re-)joining group
00:26:15.196 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] (Re-)joining group
00:26:15.197 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] (Re-)joining group
00:26:30.504 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] Finished assignment for group at generation 77: {consumer-csp-prov-emerald-test-2-d2f920dc-a52a-4ed4-aa0f-1e3ef268a4fc=org.apache.kafka.clients.consumer.ConsumerPartitionAssignor$Assignment@1f9e0b89, consumer-csp-prov-emerald-test-1-242f32f2-b823-4946-be1f-a6c584a0f3ce=org.apache.kafka.clients.consumer.ConsumerPartitionAssignor$Assignment@91e5bc9}
00:26:30.815 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] Successfully joined group with generation 77
00:26:30.815 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] Successfully joined group with generation 77
00:26:30.818 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] Adding newly assigned partitions: kaas.pe.enrollment.csp.ts2-0
00:26:30.818 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] Adding newly assigned partitions: kaas.pe.enrollment.csp.ts2-1
00:26:31.133 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] Setting offset for partition kaas.pe.enrollment.csp.ts2-0 to the committed offset FetchPosition{offset=500428, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=apslp1559.uhc.com:9093 (id: 69 rack: null), epoch=37}}
00:26:31.133 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] Setting offset for partition kaas.pe.enrollment.csp.ts2-1 to the committed offset FetchPosition{offset=499553, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=apslp1562.uhc.com:9093 (id: 72 rack: null), epoch=36}}
00:26:31.134 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO org.springframework.kafka.listener.KafkaMessageListenerContainer - csp-prov-emerald-test: partitions assigned: [kaas.pe.enrollment.csp.ts2-0]
00:26:31.134 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO org.springframework.kafka.listener.KafkaMessageListenerContainer - csp-prov-emerald-test: partitions assigned: [kaas.pe.enrollment.csp.ts2-1]
coming from stream :018601027 into offset 500428 and partition 0
coming from stream :018601027 into offset 499553 and partition 1
更新:
我尝试用 1 个消费者线程和 2 个消费者线程再次打印记录。
来自 2 个消费者线程的输出:
你可以观察到行为的随机性。
重复记录:同一条记录进入 2 个不同的分区
先运行 :
coming from stream :018601027 into offset 500428 and partition 0 <-- duplicate
coming from stream :018601027 into offset 499553 and partition 1 <-- duplicate
coming from stream :072461600 into offset 500429 and partition 0
coming from stream :027400400 into offset 499554 and partition 1
coming from stream :090341206 into offset 500430 and partition 0 <-- duplicate
coming from stream :090341206 into offset 499555 and partition 1 <-- duplicate
coming from stream :113423162 into offset 500431 and partition 0
coming from stream :052407795 into offset 499556 and partition 1
coming from stream :086586131 into offset 499557 and partition 1
coming from stream :113424057 into offset 500432 and partition 0
coming from stream :090443768 into offset 499558 and partition 1
coming from stream :024580061 into offset 500433 and partition 0
coming from stream :072461600 into offset 500434 and partition 0
coming from stream :090465976 into offset 499559 and partition 1
coming from stream :094324212 into offset 499560 and partition 1
coming from stream :090341206 into offset 500435 and partition 0
coming from stream :107422748 into offset 499561 and partition 1
coming from stream :113423162 into offset 500436 and partition 0
coming from stream :004582777 into offset 499562 and partition 1
coming from stream :113424057 into offset 500437 and partition 0
coming from stream :018601027 into offset 499563 and partition 1
coming from stream :027400400 into offset 499564 and partition 1
coming from stream :031866294 into offset 499565 and partition 1
coming from stream :052407795 into offset 499566 and partition 1
coming from stream :086586131 into offset 499567 and partition 1
coming from stream :090443768 into offset 499568 and partition 1
coming from stream :090465976 into offset 499569 and partition 1
coming from stream :094324212 into offset 499570 and partition 1
coming from stream :107422748 into offset 499571 and partition 1
coming from stream :272626998 into offset 499572 and partition 1
第二个运行:
coming from stream :024580061 into offset 499553 and partition 1 <-- duplicate
coming from stream :024580061 into offset 500428 and partition 0 <-- duplicate
coming from stream :027400400 into offset 499554 and partition 1
coming from stream :072461600 into offset 500429 and partition 0
coming from stream :090341206 into offset 500430 and partition 0
coming from stream :031866294 into offset 499555 and partition 1
coming from stream :113423162 into offset 500431 and partition 0
coming from stream :052407795 into offset 499556 and partition 1
coming from stream :113424057 into offset 500432 and partition 0
coming from stream :086586131 into offset 499557 and partition 1
coming from stream :090443768 into offset 499558 and partition 1
coming from stream :024580061 into offset 500433 and partition 0
coming from stream :072461600 into offset 500434 and partition 0
coming from stream :090465976 into offset 499559 and partition 1
coming from stream :090341206 into offset 500435 and partition 0
coming from stream :094324212 into offset 499560 and partition 1
coming from stream :113423162 into offset 500436 and partition 0 <-- duplicate
coming from stream :113423162 into offset 499561 and partition 1 <-- duplicate
coming from stream :004582777 into offset 499562 and partition 1
coming from stream :113424057 into offset 500437 and partition 0
coming from stream :018601027 into offset 499563 and partition 1
coming from stream :027400400 into offset 499564 and partition 1
coming from stream :031866294 into offset 499565 and partition 1
coming from stream :052407795 into offset 499566 and partition 1
coming from stream :086586131 into offset 499567 and partition 1
coming from stream :090443768 into offset 499568 and partition 1
coming from stream :090465976 into offset 499569 and partition 1
coming from stream :094324212 into offset 499570 and partition 1
coming from stream :107422748 into offset 499571 and partition 1
coming from stream :272626998 into offset 499572 and partition 1
来自 1 个消费者线程的输出:
如果使用 1 个消费者线程,则不会重复。记录正在按预期打印。这是否意味着 spring-kafka 并发参数不可靠?那么如何扩展消费者应用程序以并行处理记录?
coming from stream :018601027 into offset 499553 and partition 1
coming from stream :027400400 into offset 499554 and partition 1
coming from stream :031866294 into offset 499555 and partition 1
coming from stream :052407795 into offset 499556 and partition 1
coming from stream :086586131 into offset 499557 and partition 1
coming from stream :090443768 into offset 499558 and partition 1
coming from stream :090465976 into offset 499559 and partition 1
coming from stream :094324212 into offset 499560 and partition 1
coming from stream :107422748 into offset 499561 and partition 1
coming from stream :004582777 into offset 499562 and partition 1
coming from stream :018601027 into offset 499563 and partition 1
coming from stream :027400400 into offset 499564 and partition 1
coming from stream :031866294 into offset 499565 and partition 1
coming from stream :052407795 into offset 499566 and partition 1
coming from stream :086586131 into offset 499567 and partition 1
coming from stream :090443768 into offset 499568 and partition 1
coming from stream :090465976 into offset 499569 and partition 1
coming from stream :094324212 into offset 499570 and partition 1
coming from stream :107422748 into offset 499571 and partition 1
coming from stream :272626998 into offset 499572 and partition 1
coming from stream :024580061 into offset 500428 and partition 0
coming from stream :072461600 into offset 500429 and partition 0
coming from stream :090341206 into offset 500430 and partition 0
coming from stream :113423162 into offset 500431 and partition 0
coming from stream :113424057 into offset 500432 and partition 0
coming from stream :024580061 into offset 500433 and partition 0
coming from stream :072461600 into offset 500434 and partition 0
coming from stream :090341206 into offset 500435 and partition 0
coming from stream :113423162 into offset 500436 and partition 0
coming from stream :113424057 into offset 500437 and partition 0
您从侦听器调用的代码很可能存在 thread-safetly 问题;当使用多线程时,你不能使用字段,除非你用同步逻辑保护它们。
例如
public class NotThreadSafe {
String someValue;
void processRecord(ConsumerRecord<?, ?> record) {
this.someValue = record.value();
doSomeMoreWork();
}
void doSomeMoreWork() {
...
doSomethingWith(this.someValue);
}
}
当有多个线程时,一个线程可能会从另一个线程看到 someValue
。