spring batch KafkaConsumer 多线程访问不安全
spring batch KafkaConsumer is not safe for multi-threaded access
我是孢子批次的新手。我有需要读取 kafka 流和过滤数据并保存在数据库中的要求。为此,我对 KafkaItemReader 使用了 spring 批处理。当我 运行 在 spring 作业中启动多个作业时,它给出 java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access 错误。这一次它只有 运行 最后一份工作。
这是 spring 批处理配置。
@Autowired
TaskExecutor taskExecutor;
@Autowired
JobRepository jobRepository;
@Bean
KafkaItemReader<Long, Event> kafkaItemReader() {
Properties props = new Properties();
props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
props.putAll(this.properties.buildConsumerProperties());
return new KafkaItemReaderBuilder<Long, Event>()
.partitions(0)
.consumerProperties(props)
.name("event-reader")
.saveState(true)
.topic(topicName)
.build();
}
@Bean
public TaskExecutor taskExecutor(){
SimpleAsyncTaskExecutor asyncTaskExecutor=new SimpleAsyncTaskExecutor("spring_batch");
asyncTaskExecutor.setConcurrencyLimit(5);
return asyncTaskExecutor;
}
@Bean(name = "JobLauncher")
public JobLauncher simpleJobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(taskExecutor);
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
并且有启动新作业的控制器端点。这是我必须使用 start new Job
的方式
@Autowired
@Qualifier("JobLauncher")
private JobLauncher jobLauncher;
Map<String, JobParameter> items = new HashMap<>();
items.put("userId", new JobParameter("UserInputId"));
JobParameters paramaters = new JobParameters(items);
try {
jobLauncher.run(job, paramaters);
} catch (Exception e) {
e.printStackTrace();
}
我看到KafkaItemReader不是线程safe。我想知道这种方式是否正确,或者有什么方法可以在多线程 spring 批处理环境中读取 kafka 流。
感谢和问候
KafkaItemReader
被记录为非线程安全的,这里是其 Javadoc 的摘录:
Since KafkaConsumer is not thread-safe, this reader is not thread-safe.
所以在多线程环境下使用是不正确的,不符合文档。您可以做的是每个分区使用 reader。
根据 spring documentation, it uses KafkaConsumer; which itself is not threadsafe as per their detailed documentation .
请查看您是否可以使用该文档中提到的任何方法(即解耦或每个线程单个使用者)。在您的示例中,您可能需要为 taskexecutor 使用单独的处理程序(如果您遵循解耦方法)。
我是孢子批次的新手。我有需要读取 kafka 流和过滤数据并保存在数据库中的要求。为此,我对 KafkaItemReader 使用了 spring 批处理。当我 运行 在 spring 作业中启动多个作业时,它给出 java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access 错误。这一次它只有 运行 最后一份工作。
这是 spring 批处理配置。
@Autowired
TaskExecutor taskExecutor;
@Autowired
JobRepository jobRepository;
@Bean
KafkaItemReader<Long, Event> kafkaItemReader() {
Properties props = new Properties();
props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
props.putAll(this.properties.buildConsumerProperties());
return new KafkaItemReaderBuilder<Long, Event>()
.partitions(0)
.consumerProperties(props)
.name("event-reader")
.saveState(true)
.topic(topicName)
.build();
}
@Bean
public TaskExecutor taskExecutor(){
SimpleAsyncTaskExecutor asyncTaskExecutor=new SimpleAsyncTaskExecutor("spring_batch");
asyncTaskExecutor.setConcurrencyLimit(5);
return asyncTaskExecutor;
}
@Bean(name = "JobLauncher")
public JobLauncher simpleJobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(taskExecutor);
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
并且有启动新作业的控制器端点。这是我必须使用 start new Job
的方式 @Autowired
@Qualifier("JobLauncher")
private JobLauncher jobLauncher;
Map<String, JobParameter> items = new HashMap<>();
items.put("userId", new JobParameter("UserInputId"));
JobParameters paramaters = new JobParameters(items);
try {
jobLauncher.run(job, paramaters);
} catch (Exception e) {
e.printStackTrace();
}
我看到KafkaItemReader不是线程safe。我想知道这种方式是否正确,或者有什么方法可以在多线程 spring 批处理环境中读取 kafka 流。 感谢和问候
KafkaItemReader
被记录为非线程安全的,这里是其 Javadoc 的摘录:
Since KafkaConsumer is not thread-safe, this reader is not thread-safe.
所以在多线程环境下使用是不正确的,不符合文档。您可以做的是每个分区使用 reader。
根据 spring documentation, it uses KafkaConsumer; which itself is not threadsafe as per their detailed documentation .
请查看您是否可以使用该文档中提到的任何方法(即解耦或每个线程单个使用者)。在您的示例中,您可能需要为 taskexecutor 使用单独的处理程序(如果您遵循解耦方法)。