Post 到 spring kafka 中的主题和手动提交
Post to topic and Manual commit in spring kafka
我们正在研究一项要求,其中 Spring-kafka 消费者正在提交一个主题,然后应该手动提交偏移量。
我们正在考虑的场景之一是,当消费者在向主题提交消息和手动提交偏移量之前发生故障时会发生什么。在这种情况下,应用程序将重新处理消息并再次提交,从而导致主题中出现重复消息。
有什么方法可以让这两个活动都成为 TransactionManager 的一部分,从而 success/fails 都是?
配置文件
@Bean
public ProducerFactory producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
DefaultKafkaProducerFactory<String, User> defaultKafkaProducerFactory = new DefaultKafkaProducerFactory<>(config);
defaultKafkaProducerFactory.setTransactionIdPrefix("trans");
//defaultKafkaProducerFactory.transactionCapable();
return defaultKafkaProducerFactory;
//return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, User> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public KafkaTransactionManager<String, User> transactionManager() {
KafkaTransactionManager transactionManager = new KafkaTransactionManager(producerFactory());
transactionManager.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ALWAYS);
transactionManager.setNestedTransactionAllowed(true);
return transactionManager;
}
/**
* New configuration for the consumerFactory added
*
* @return
*/
@Bean
public ConsumerFactory<String, User> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "firstTopic-group");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new JsonDeserializer<User>(User.class));
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, User>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, User> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setTransactionManager(transactionManager());
factory.setRetryTemplate(kafkaRetry());
factory.setStatefulRetry(true);
factory.setErrorHandler(getErrorHandler());
factory.setRecoveryCallback(retryContext -> {
//implement the logic to decide the action after all retries are over.
ConsumerRecord consumerRecord = (ConsumerRecord) retryContext.getAttribute("record");
System.out.println("Recovery is called for message " + consumerRecord.value());
return Optional.empty();
});
return factory;
}
public RetryTemplate kafkaRetry() {
RetryTemplate retryTemplate = new RetryTemplate();
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(60 * 1000);
backOffPolicy.setMultiplier(3);
backOffPolicy.setMaxInterval(4 * 60 * 1000); // original 25 * 60 * 1000
retryTemplate.setBackOffPolicy(backOffPolicy);
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(4);
retryTemplate.setRetryPolicy(retryPolicy);
return retryTemplate;
}
public SeekToCurrentErrorHandler getErrorHandler() {
SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler() {
@Override
public void handle(Exception thrownException,
List<ConsumerRecord<?, ?>> records,
Consumer<?, ?> consumer,
MessageListenerContainer container) {
//super.handle(thrownException, records, consumer, container);
if (!records.isEmpty()) {
ConsumerRecord<?, ?> record = records.get(0);
String topic = record.topic();
long offset = record.offset();
int partition = record.partition();
if (thrownException instanceof DeserializationException) {
System.out.println("------1111------deserialization exception ");
} else {
System.out.println("------xxxx------Record is empty ");
consumer.seek(new TopicPartition(topic, partition), offset);
}
} else {
System.out.println("------4444------Record is empty ");
}
}
};
return errorHandler;
}
Kafka 监听器
@Autowired
KafkaTemplate<String, User> kafkaTemplate;
@KafkaListener(topics = "firstTopic", groupId = "firstTopic-group")
@Transactional
public void onCustomerMessage(User user, Acknowledgment acknowledgment) throws Exception {
/*System.out.println("get the message " +user.getFirstName());
if (user.getFirstName().equalsIgnoreCase("Test")) {
throw new RuntimeException("Incompatible message " + user.getFirstName());
}
*/
//postToSecondTopic(acknowledgment, user);
System.out.println("NOT In transaction");
kafkaTemplate.executeInTransaction(t -> {
System.out.println("---------------------->");
int number = (int) (Math.random() * 10);
t.send("secondtopic", user);
if (number % 5 == 0)
throw new RuntimeException("fail");
acknowledgment.acknowledge();
return true;
});
System.out.println("*** exit ***");
}
日志错误
2020-05-28 15:52:53.597 错误 112469 --- [nio-8080-exec-1] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] throw exception [Request processing failed;嵌套异常是 java.lang.IllegalStateException: 没有事务在处理中;可能的解决方案:运行在template.executeInTransaction()操作范围内的模板操作,在调用模板方法之前使用@Transactional启动事务,运行在监听器容器启动的事务中消费记录时] 根本原因
java.lang.IllegalStateException:没有交易在进行中;可能的解决方案:运行在template.executeInTransaction()操作范围内的模板操作,在调用模板方法之前使用@Transactional启动事务,运行在监听器容器启动的事务中消费记录时
在 org.springframework.util.Assert.state(Assert.java:73) ~[spring-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
在 org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:394) ~[spring-kafka-2.3.7.RELEASE.jar:2.3.7.RELEASE]
在 org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:216) ~[spring-kafka-2.3.7.RELEASE.jar:2.3.7.RELEASE]
在 com.barade.sandesh.springKafka.UserResource.postComments(UserResource.java:26) ~[类/:na]
在 sun.reflect.NativeMethodAccessorImpl.invoke0(本机方法)~[na:1.8.0_252]
在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_252]
在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_252]
在 java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_252]
在 org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:190) ~[spring-web-5.2.5.RELEASE.jar:5.2.5.RELEASE]
在 org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:138) ~[spring-web-5.2.5.RELEASE.jar:5.2.5.RELEASE]
在 org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:105) ~[spring-webmvc-5.2.5.RELEASE.jar:5.2.5.RELEASE]
在 org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:879) ~[spring-webmvc-5.2.5.RELEASE.jar:5.2.5.RELEASE]
在 org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:793) ~[spring-webmvc-5.2.5.RELEASE.jar:5.2.5.RELEASE]
在 org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-5.2.5.RELEASE.jar:5.2.5.RELEASE]
在 org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1040) ~[spring-webmvc-5.2.5.RELEASE.jar:5.2.5.RELEASE]
在 org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:943) ~[spring-webmvc-5.2.5.RELEASE.jar:5.2.5.RELEASE]
在 org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006) ~[spring-webmvc-5.2.5.RELEASE.jar:5.2.5.RELEASE]
在 org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:909) ~[spring-webmvc-5.2.5.RELEASE.jar:5.2.5.RELEASE]
用户资源
@RestController
@RequestMapping("accounts")
public class UserResource {
@Autowired
KafkaTemplate <String, User> kafkaTemplate;
@PostMapping("/users")
public String postComments(@RequestParam ("firstName") final String firstName,
@RequestParam ("lastName") final String lastName,
@RequestParam ("userName") final String userName ) {
List<String> accountTypes = new ArrayList<String>();
kafkaTemplate.send("firstTopic", new User(firstName,lastName,userName));
return "Message sent to the Error queue";
}
}
是;参见 Transactions。
容器启动事务,监听器中任何KafkaTemplate
发送操作参与事务;容器将偏移量发送到事务并提交(如果侦听器正常退出)。
无需"manually"提交偏移量。
我们正在研究一项要求,其中 Spring-kafka 消费者正在提交一个主题,然后应该手动提交偏移量。
我们正在考虑的场景之一是,当消费者在向主题提交消息和手动提交偏移量之前发生故障时会发生什么。在这种情况下,应用程序将重新处理消息并再次提交,从而导致主题中出现重复消息。
有什么方法可以让这两个活动都成为 TransactionManager 的一部分,从而 success/fails 都是?
配置文件
@Bean
public ProducerFactory producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
DefaultKafkaProducerFactory<String, User> defaultKafkaProducerFactory = new DefaultKafkaProducerFactory<>(config);
defaultKafkaProducerFactory.setTransactionIdPrefix("trans");
//defaultKafkaProducerFactory.transactionCapable();
return defaultKafkaProducerFactory;
//return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, User> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public KafkaTransactionManager<String, User> transactionManager() {
KafkaTransactionManager transactionManager = new KafkaTransactionManager(producerFactory());
transactionManager.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ALWAYS);
transactionManager.setNestedTransactionAllowed(true);
return transactionManager;
}
/**
* New configuration for the consumerFactory added
*
* @return
*/
@Bean
public ConsumerFactory<String, User> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "firstTopic-group");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new JsonDeserializer<User>(User.class));
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, User>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, User> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setTransactionManager(transactionManager());
factory.setRetryTemplate(kafkaRetry());
factory.setStatefulRetry(true);
factory.setErrorHandler(getErrorHandler());
factory.setRecoveryCallback(retryContext -> {
//implement the logic to decide the action after all retries are over.
ConsumerRecord consumerRecord = (ConsumerRecord) retryContext.getAttribute("record");
System.out.println("Recovery is called for message " + consumerRecord.value());
return Optional.empty();
});
return factory;
}
public RetryTemplate kafkaRetry() {
RetryTemplate retryTemplate = new RetryTemplate();
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(60 * 1000);
backOffPolicy.setMultiplier(3);
backOffPolicy.setMaxInterval(4 * 60 * 1000); // original 25 * 60 * 1000
retryTemplate.setBackOffPolicy(backOffPolicy);
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(4);
retryTemplate.setRetryPolicy(retryPolicy);
return retryTemplate;
}
public SeekToCurrentErrorHandler getErrorHandler() {
SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler() {
@Override
public void handle(Exception thrownException,
List<ConsumerRecord<?, ?>> records,
Consumer<?, ?> consumer,
MessageListenerContainer container) {
//super.handle(thrownException, records, consumer, container);
if (!records.isEmpty()) {
ConsumerRecord<?, ?> record = records.get(0);
String topic = record.topic();
long offset = record.offset();
int partition = record.partition();
if (thrownException instanceof DeserializationException) {
System.out.println("------1111------deserialization exception ");
} else {
System.out.println("------xxxx------Record is empty ");
consumer.seek(new TopicPartition(topic, partition), offset);
}
} else {
System.out.println("------4444------Record is empty ");
}
}
};
return errorHandler;
}
Kafka 监听器
@Autowired
KafkaTemplate<String, User> kafkaTemplate;
@KafkaListener(topics = "firstTopic", groupId = "firstTopic-group")
@Transactional
public void onCustomerMessage(User user, Acknowledgment acknowledgment) throws Exception {
/*System.out.println("get the message " +user.getFirstName());
if (user.getFirstName().equalsIgnoreCase("Test")) {
throw new RuntimeException("Incompatible message " + user.getFirstName());
}
*/
//postToSecondTopic(acknowledgment, user);
System.out.println("NOT In transaction");
kafkaTemplate.executeInTransaction(t -> {
System.out.println("---------------------->");
int number = (int) (Math.random() * 10);
t.send("secondtopic", user);
if (number % 5 == 0)
throw new RuntimeException("fail");
acknowledgment.acknowledge();
return true;
});
System.out.println("*** exit ***");
}
日志错误
2020-05-28 15:52:53.597 错误 112469 --- [nio-8080-exec-1] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] throw exception [Request processing failed;嵌套异常是 java.lang.IllegalStateException: 没有事务在处理中;可能的解决方案:运行在template.executeInTransaction()操作范围内的模板操作,在调用模板方法之前使用@Transactional启动事务,运行在监听器容器启动的事务中消费记录时] 根本原因
java.lang.IllegalStateException:没有交易在进行中;可能的解决方案:运行在template.executeInTransaction()操作范围内的模板操作,在调用模板方法之前使用@Transactional启动事务,运行在监听器容器启动的事务中消费记录时 在 org.springframework.util.Assert.state(Assert.java:73) ~[spring-core-5.2.5.RELEASE.jar:5.2.5.RELEASE] 在 org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:394) ~[spring-kafka-2.3.7.RELEASE.jar:2.3.7.RELEASE] 在 org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:216) ~[spring-kafka-2.3.7.RELEASE.jar:2.3.7.RELEASE] 在 com.barade.sandesh.springKafka.UserResource.postComments(UserResource.java:26) ~[类/:na] 在 sun.reflect.NativeMethodAccessorImpl.invoke0(本机方法)~[na:1.8.0_252] 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_252] 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_252] 在 java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_252] 在 org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:190) ~[spring-web-5.2.5.RELEASE.jar:5.2.5.RELEASE] 在 org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:138) ~[spring-web-5.2.5.RELEASE.jar:5.2.5.RELEASE] 在 org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:105) ~[spring-webmvc-5.2.5.RELEASE.jar:5.2.5.RELEASE] 在 org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:879) ~[spring-webmvc-5.2.5.RELEASE.jar:5.2.5.RELEASE] 在 org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:793) ~[spring-webmvc-5.2.5.RELEASE.jar:5.2.5.RELEASE] 在 org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-5.2.5.RELEASE.jar:5.2.5.RELEASE] 在 org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1040) ~[spring-webmvc-5.2.5.RELEASE.jar:5.2.5.RELEASE] 在 org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:943) ~[spring-webmvc-5.2.5.RELEASE.jar:5.2.5.RELEASE] 在 org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006) ~[spring-webmvc-5.2.5.RELEASE.jar:5.2.5.RELEASE] 在 org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:909) ~[spring-webmvc-5.2.5.RELEASE.jar:5.2.5.RELEASE]
用户资源
@RestController
@RequestMapping("accounts")
public class UserResource {
@Autowired
KafkaTemplate <String, User> kafkaTemplate;
@PostMapping("/users")
public String postComments(@RequestParam ("firstName") final String firstName,
@RequestParam ("lastName") final String lastName,
@RequestParam ("userName") final String userName ) {
List<String> accountTypes = new ArrayList<String>();
kafkaTemplate.send("firstTopic", new User(firstName,lastName,userName));
return "Message sent to the Error queue";
}
}
是;参见 Transactions。
容器启动事务,监听器中任何KafkaTemplate
发送操作参与事务;容器将偏移量发送到事务并提交(如果侦听器正常退出)。
无需"manually"提交偏移量。