spring batch KafkaConsumer 多线程访问不安全

spring batch KafkaConsumer is not safe for multi-threaded access

我是孢子批次的新手。我有需要读取 kafka 流和过滤数据并保存在数据库中的要求。为此,我对 KafkaItemReader 使用了 spring 批处理。当我 运行 在 spring 作业中启动多个作业时,它给出 java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access 错误。这一次它只有 运行 最后一份工作。

这是 spring 批处理配置。

    @Autowired
    TaskExecutor taskExecutor;

    @Autowired
    JobRepository jobRepository;

    @Bean
    KafkaItemReader<Long, Event> kafkaItemReader() {
        Properties props = new Properties();
        props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
        props.putAll(this.properties.buildConsumerProperties());
        return new KafkaItemReaderBuilder<Long, Event>()
                .partitions(0)
                .consumerProperties(props)
                .name("event-reader")
                .saveState(true)
                .topic(topicName)
                .build();
    }

    @Bean
    public TaskExecutor taskExecutor(){
        SimpleAsyncTaskExecutor asyncTaskExecutor=new SimpleAsyncTaskExecutor("spring_batch");
        asyncTaskExecutor.setConcurrencyLimit(5);
        return asyncTaskExecutor;
    }

    @Bean(name = "JobLauncher")
    public JobLauncher simpleJobLauncher() throws Exception {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(jobRepository);
        jobLauncher.setTaskExecutor(taskExecutor);
        jobLauncher.afterPropertiesSet();
        return jobLauncher;
    }

并且有启动新作业的控制器端点。这是我必须使用 start new Job

的方式
    @Autowired
    @Qualifier("JobLauncher")
    private JobLauncher jobLauncher;


    Map<String, JobParameter> items = new HashMap<>();
    items.put("userId", new JobParameter("UserInputId"));
    JobParameters paramaters = new JobParameters(items);
    try {
        jobLauncher.run(job, paramaters);
    } catch (Exception e) {
        e.printStackTrace();
    }

我看到KafkaItemReader不是线程safe。我想知道这种方式是否正确,或者有什么方法可以在多线程 spring 批处理环境中读取 kafka 流。 感谢和问候

KafkaItemReader 被记录为非线程安全的,这里是其 Javadoc 的摘录:

Since KafkaConsumer is not thread-safe, this reader is not thread-safe.

所以在多线程环境下使用是不正确的,不符合文档。您可以做的是每个分区使用 reader。

根据 spring documentation, it uses KafkaConsumer; which itself is not threadsafe as per their detailed documentation .

请查看您是否可以使用该文档中提到的任何方法(即解耦或每个线程单个使用者)。在您的示例中,您可能需要为 taskexecutor 使用单独的处理程序(如果您遵循解耦方法)。