spring kafka listener中消息的处理策略
processing strategy of message in spring kafka listener
只是想确定消息是否以正确的方式处理。当监听器接收到消息时,它将始终由一个新线程处理(将处理器 bean 定义为原型)。这个实现是否正确? (我已经考虑过监听器不是线程安全的,因此使用了 bean 的原型作用域来处理消息)
(输入:TestTopic- 5 个分区 - 1 个消费者)或(输入:TestTopic- 5 个分区 - 5 个消费者)
public class EventListener {
@Autowired
private EventProcessor eventProcessor;
@KafkaListener(topics = "TestTopic", containerFactory = "kafkaListenerContainerFactory",
autoStartup = "true")
public void onMessage(
@Payload List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment acknowledgment) {
eventProcessor.processAndAcknowledgeBatchMessages(consumerRecords, acknowledgment);
}
}
//事件处理器
@Slf4j
@Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@NoArgsConstructor
@SuppressWarnings("unused")
public class EventProcessorImpl implements EventProcessor {
@Autowired
private KafkaProducerTemplate kafkaProducerTemplate;
@Autowired
private ObjectMapper localObjectMapper;
@Autowired
private Dao dao;
public void processAndAcknowledgeBatchMessages(
List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment acknowledgment) {
long start = System.currentTimeMillis();
consumerRecords.forEach( consumerRecord -> {
Event event = localObjectMapper.readValue(consumerRecord.value(), Event.class);
dao.save(process(event));
});
acknowledgment.acknowledge();
}
}
不,这是不正确的;你不应该在另一个线程上执行;它会导致提交偏移量和错误处理的问题。
此外,使 EventProcessorImpl
成为原型 bean 也无济于事。这只是意味着每次引用 bean 时都会使用一个新实例。
因为它是 @Autowired
,所以它只在初始化期间被引用一次。要为每个请求获取一个新实例,您需要每次都在应用程序上下文中调用 getBean()
。
最好使您的代码线程安全。
编辑
有(至少)几种方法来处理原型作用域中定义的非线程安全服务。
- 使用 ThreadLocal:
@SpringBootApplication
public class So68447863Application {
public static void main(String[] args) {
SpringApplication.run(So68447863Application.class, args);
}
private static final ThreadLocal<NotThreadSafeService> SERVICES = new ThreadLocal<>();
@Autowired
ApplicationContext context;
@KafkaListener(id = "so68447863", topics = "so68447863", concurrency = "5")
void listen(String in) {
NotThreadSafeService service = SERVICES.get();
if (service == null) {
service = this.context.getBean(NotThreadSafeService.class);
SERVICES.set(service);
}
service.process(in);
}
@EventListener
void removeService(ConsumerStoppedEvent event) {
System.out.println("Consumer stopped; removing TL");
SERVICES.remove();
}
@Bean
NewTopic topic() {
return TopicBuilder.name("so68447863").partitions(10).replicas(1).build();
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
NotThreadSafeService service() {
return new NotThreadSafeService();
}
}
class NotThreadSafeService {
void process(String msg) {
System.out.println(msg + " processed by " + this);
}
}
- 使用实例池。
@SpringBootApplication
public class So68447863Application {
public static void main(String[] args) {
SpringApplication.run(So68447863Application.class, args);
}
private static final BlockingQueue<NotThreadSafeService> SERVICES = new LinkedBlockingQueue<>();
@Autowired
ApplicationContext context;
@KafkaListener(id = "so68447863", topics = "so68447863", concurrency = "5")
void listen(String in) {
NotThreadSafeService service = SERVICES.poll();
if (service == null) {
service = this.context.getBean(NotThreadSafeService.class);
}
try {
service.process(in);
}
finally {
SERVICES.add(service);
}
}
@Bean
NewTopic topic() {
return TopicBuilder.name("so68447863").partitions(10).replicas(1).build();
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
NotThreadSafeService service() {
return new NotThreadSafeService();
}
}
class NotThreadSafeService {
void process(String msg) {
System.out.println(msg + " processed by " + this);
}
}
只是想确定消息是否以正确的方式处理。当监听器接收到消息时,它将始终由一个新线程处理(将处理器 bean 定义为原型)。这个实现是否正确? (我已经考虑过监听器不是线程安全的,因此使用了 bean 的原型作用域来处理消息)
(输入:TestTopic- 5 个分区 - 1 个消费者)或(输入:TestTopic- 5 个分区 - 5 个消费者)
public class EventListener {
@Autowired
private EventProcessor eventProcessor;
@KafkaListener(topics = "TestTopic", containerFactory = "kafkaListenerContainerFactory",
autoStartup = "true")
public void onMessage(
@Payload List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment acknowledgment) {
eventProcessor.processAndAcknowledgeBatchMessages(consumerRecords, acknowledgment);
}
}
//事件处理器
@Slf4j
@Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@NoArgsConstructor
@SuppressWarnings("unused")
public class EventProcessorImpl implements EventProcessor {
@Autowired
private KafkaProducerTemplate kafkaProducerTemplate;
@Autowired
private ObjectMapper localObjectMapper;
@Autowired
private Dao dao;
public void processAndAcknowledgeBatchMessages(
List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment acknowledgment) {
long start = System.currentTimeMillis();
consumerRecords.forEach( consumerRecord -> {
Event event = localObjectMapper.readValue(consumerRecord.value(), Event.class);
dao.save(process(event));
});
acknowledgment.acknowledge();
}
}
不,这是不正确的;你不应该在另一个线程上执行;它会导致提交偏移量和错误处理的问题。
此外,使 EventProcessorImpl
成为原型 bean 也无济于事。这只是意味着每次引用 bean 时都会使用一个新实例。
因为它是 @Autowired
,所以它只在初始化期间被引用一次。要为每个请求获取一个新实例,您需要每次都在应用程序上下文中调用 getBean()
。
最好使您的代码线程安全。
编辑
有(至少)几种方法来处理原型作用域中定义的非线程安全服务。
- 使用 ThreadLocal:
@SpringBootApplication
public class So68447863Application {
public static void main(String[] args) {
SpringApplication.run(So68447863Application.class, args);
}
private static final ThreadLocal<NotThreadSafeService> SERVICES = new ThreadLocal<>();
@Autowired
ApplicationContext context;
@KafkaListener(id = "so68447863", topics = "so68447863", concurrency = "5")
void listen(String in) {
NotThreadSafeService service = SERVICES.get();
if (service == null) {
service = this.context.getBean(NotThreadSafeService.class);
SERVICES.set(service);
}
service.process(in);
}
@EventListener
void removeService(ConsumerStoppedEvent event) {
System.out.println("Consumer stopped; removing TL");
SERVICES.remove();
}
@Bean
NewTopic topic() {
return TopicBuilder.name("so68447863").partitions(10).replicas(1).build();
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
NotThreadSafeService service() {
return new NotThreadSafeService();
}
}
class NotThreadSafeService {
void process(String msg) {
System.out.println(msg + " processed by " + this);
}
}
- 使用实例池。
@SpringBootApplication
public class So68447863Application {
public static void main(String[] args) {
SpringApplication.run(So68447863Application.class, args);
}
private static final BlockingQueue<NotThreadSafeService> SERVICES = new LinkedBlockingQueue<>();
@Autowired
ApplicationContext context;
@KafkaListener(id = "so68447863", topics = "so68447863", concurrency = "5")
void listen(String in) {
NotThreadSafeService service = SERVICES.poll();
if (service == null) {
service = this.context.getBean(NotThreadSafeService.class);
}
try {
service.process(in);
}
finally {
SERVICES.add(service);
}
}
@Bean
NewTopic topic() {
return TopicBuilder.name("so68447863").partitions(10).replicas(1).build();
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
NotThreadSafeService service() {
return new NotThreadSafeService();
}
}
class NotThreadSafeService {
void process(String msg) {
System.out.println(msg + " processed by " + this);
}
}