Spring Kafka,在不同线程中手动提交
Spring Kafka, manual commiting in different threads
同事们大家好。我正在使用 Spring Kafka 2.2.5
我有一个听众:
@KafkaListener(topics = "${kafka.execution-task-topic}", containerFactory = "executionTaskObjectContainerFactory")
public void protocolEventsHandle(ExecutionTask executionTask,
Acknowledgment acknowledgment,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.OFFSET) long offset) {
ResponseEntity < String > stringResponseEntity = airflowRestRunner.startDag(executionTask);
JSONObject body = new JSONObject(stringResponseEntity.getBody());
String message = body.getString("message");
String runId = messageParser.getRunId(message);
ExecutionTaskMessageInfo messageInfo = new ExecutionTaskMessageInfo(offset, partition, false, acknowledgment);
kafkaAcknowledgeObject.putMessageInfo(messageInfo, partition);
this.executorService.submit(kafkaAlertProducer.produceMessageAfterTaskSuccess(runId, executionTask, messageInfo));
}
我做了一些操作,如果它们成功了,我使用 Acknowledge 接口来提交偏移量。
我有问题。
在创建的线程中进行计算时,侦听器再次从相同的偏移量读取消息。因此,当我尝试确认偏移量时,应用程序崩溃了。
在并发情况下使用 Kafka 的最佳实践是什么?
我最多可以并行获取 10 条消息,我只需要在计算后提交它们。
更新1
我将来自 kafka 的所有消息存储在:
钥匙 - 零件号
值 - 包含对所需确认
的引用的特殊模型 Class
@Data
@NoArgsConstructor
@AllArgsConstructor
public abstract class KafkaAcknowledgeObject < T extends Comparable > {
protected ConcurrentHashMap < Integer,
TreeSet < T >> hashMap = new ConcurrentHashMap < > ();
public abstract void doAck();
public void putMessageInfo(T message, int partition) {
if (hashMap.containsKey(partition)) {
hashMap.get(partition).add(message);
} else {
TreeSet < T > messageInfos = new TreeSet < > ();
messageInfos.add(message);
hashMap.put(partition, messageInfos);
}
}
}
计算后我调用doAck()
,例如
@Override
public void doAck() {
for (TreeSet < ExecutionTaskMessageInfo > messageInfoTreeSet: super.hashMap.values()) {
checkHandledOffsets(messageInfoTreeSet);
}
}
private void checkHandledOffsets(TreeSet < ExecutionTaskMessageInfo > messageInfoTreeSet) {
ExecutionTaskMessageInfo first = getFirstMessageInfo(messageInfoTreeSet);
if (first.isCompleted()) {
first.getAcknowledgment().acknowledge();
messageInfoTreeSet.remove(first);
checkHandledOffsets(messageInfoTreeSet);
}
return;
}
private ExecutionTaskMessageInfo getFirstMessageInfo(TreeSet < ExecutionTaskMessageInfo > messageInfoTreeSet) {
Iterator < ExecutionTaskMessageInfo > iterator = messageInfoTreeSet.iterator();
return iterator.next();
}
你在做什么应该没问题;我刚刚测试了一个类似的安排,它对我来说很好用...
@SpringBootApplication
public class So56190029Application {
public static void main(String[] args) {
SpringApplication.run(So56190029Application.class, args);
}
private final ExecutorService exec = Executors.newSingleThreadExecutor();
private final AtomicInteger count = new AtomicInteger();
@KafkaListener(id = "so56190029", topics = "so56190029")
public void listen(String in, Acknowledgment ack) {
this.exec.execute(runner(in, ack));
}
private Runnable runner(String payload, Acknowledgment ack) {
return () -> {
System.out.println(payload);
if (this.count.incrementAndGet() % 3 == 0) {
System.out.println("acking");
ack.acknowledge();
}
};
}
@Bean
public ApplicationRunner runner(KafkaTemplate<?, String> template) {
return args -> IntStream.range(0, 6).forEach(i -> template.send("so56190029", "foo" + i));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.getContainerProperties().setCommitLogLevel(Level.INFO);
return factory;
}
}
和
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.properties.max.poll.records=3
spring.kafka.listener.ack-mode=MANUAL
和
foo0
foo1
foo2
acking
foo3
foo4
foo5
acking
2019-05-17 14:46:28.790 INFO 62429 --- [o56190029-0-C-1] essageListenerContainer$ListenerConsumer
: Committing: {so56190029-0=OffsetAndMetadata{offset=36, leaderEpoch=null, metadata=''}}
同事们大家好。我正在使用 Spring Kafka 2.2.5 我有一个听众:
@KafkaListener(topics = "${kafka.execution-task-topic}", containerFactory = "executionTaskObjectContainerFactory")
public void protocolEventsHandle(ExecutionTask executionTask,
Acknowledgment acknowledgment,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.OFFSET) long offset) {
ResponseEntity < String > stringResponseEntity = airflowRestRunner.startDag(executionTask);
JSONObject body = new JSONObject(stringResponseEntity.getBody());
String message = body.getString("message");
String runId = messageParser.getRunId(message);
ExecutionTaskMessageInfo messageInfo = new ExecutionTaskMessageInfo(offset, partition, false, acknowledgment);
kafkaAcknowledgeObject.putMessageInfo(messageInfo, partition);
this.executorService.submit(kafkaAlertProducer.produceMessageAfterTaskSuccess(runId, executionTask, messageInfo));
}
我做了一些操作,如果它们成功了,我使用 Acknowledge 接口来提交偏移量。
我有问题。 在创建的线程中进行计算时,侦听器再次从相同的偏移量读取消息。因此,当我尝试确认偏移量时,应用程序崩溃了。
在并发情况下使用 Kafka 的最佳实践是什么? 我最多可以并行获取 10 条消息,我只需要在计算后提交它们。
更新1
我将来自 kafka 的所有消息存储在: 钥匙 - 零件号 值 - 包含对所需确认
的引用的特殊模型 Class@Data
@NoArgsConstructor
@AllArgsConstructor
public abstract class KafkaAcknowledgeObject < T extends Comparable > {
protected ConcurrentHashMap < Integer,
TreeSet < T >> hashMap = new ConcurrentHashMap < > ();
public abstract void doAck();
public void putMessageInfo(T message, int partition) {
if (hashMap.containsKey(partition)) {
hashMap.get(partition).add(message);
} else {
TreeSet < T > messageInfos = new TreeSet < > ();
messageInfos.add(message);
hashMap.put(partition, messageInfos);
}
}
}
计算后我调用doAck()
,例如
@Override
public void doAck() {
for (TreeSet < ExecutionTaskMessageInfo > messageInfoTreeSet: super.hashMap.values()) {
checkHandledOffsets(messageInfoTreeSet);
}
}
private void checkHandledOffsets(TreeSet < ExecutionTaskMessageInfo > messageInfoTreeSet) {
ExecutionTaskMessageInfo first = getFirstMessageInfo(messageInfoTreeSet);
if (first.isCompleted()) {
first.getAcknowledgment().acknowledge();
messageInfoTreeSet.remove(first);
checkHandledOffsets(messageInfoTreeSet);
}
return;
}
private ExecutionTaskMessageInfo getFirstMessageInfo(TreeSet < ExecutionTaskMessageInfo > messageInfoTreeSet) {
Iterator < ExecutionTaskMessageInfo > iterator = messageInfoTreeSet.iterator();
return iterator.next();
}
你在做什么应该没问题;我刚刚测试了一个类似的安排,它对我来说很好用...
@SpringBootApplication
public class So56190029Application {
public static void main(String[] args) {
SpringApplication.run(So56190029Application.class, args);
}
private final ExecutorService exec = Executors.newSingleThreadExecutor();
private final AtomicInteger count = new AtomicInteger();
@KafkaListener(id = "so56190029", topics = "so56190029")
public void listen(String in, Acknowledgment ack) {
this.exec.execute(runner(in, ack));
}
private Runnable runner(String payload, Acknowledgment ack) {
return () -> {
System.out.println(payload);
if (this.count.incrementAndGet() % 3 == 0) {
System.out.println("acking");
ack.acknowledge();
}
};
}
@Bean
public ApplicationRunner runner(KafkaTemplate<?, String> template) {
return args -> IntStream.range(0, 6).forEach(i -> template.send("so56190029", "foo" + i));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.getContainerProperties().setCommitLogLevel(Level.INFO);
return factory;
}
}
和
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.properties.max.poll.records=3
spring.kafka.listener.ack-mode=MANUAL
和
foo0
foo1
foo2
acking
foo3
foo4
foo5
acking
2019-05-17 14:46:28.790 INFO 62429 --- [o56190029-0-C-1] essageListenerContainer$ListenerConsumer
: Committing: {so56190029-0=OffsetAndMetadata{offset=36, leaderEpoch=null, metadata=''}}