为什么 kafkaitemReader 在新作业执行中总是包含上一个作业 运行 的最后偏移量记录?

why kafkaitemReader is always including last offset record of previous job run in the new job execution?

我在作业中使用 spring 批处理 kafkaItemReader,该作业以 10 秒的固定延迟执行。一旦块大小为 1000 的作业完成,spring 调度程序会在延迟 10 秒后再次重新提交相同的作业。我观察到 KafkaReader 总是在后续作业执行中包含最后一个偏移量记录。假设,在第一次作业执行中,记录是从偏移量 1-1000 处理的,在我的下一次作业执行中,我希望 kafkaItemReader 从 1001 偏移量中选择记录。但是,在下一次执行中,kafkaItemReader 将从偏移量 1000(已处理)中拾取它。

添加代码块

//正在使用具有以下参数的计划任务调度程序提交作业

<task:scheduled-tasks>
    <task:scheduled ref="runScheduler" method="run" fixed-delay="5000"/>
</task:scheduled-tasks>

//每次提交的作业参数

String dateParam = new Date().toString(); 作业参数参数 = 新的 JobParametersBuilder().addString("date", dateParam).toJobParameters

//下面是kafkaItemReader的配置

@Bean
public KafkaItemReader<String, String> kafkaItemReader() {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "");
    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
    props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "");
    props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "");
    props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "");
    props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "");
    props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    Map<TopicPartition,Long> partitionOffset = new HashMap<>();

    return new KafkaItemReaderBuilder<String, String>()
            .partitions(0)
            .consumerProperties(props)
            .name("customers-reader")
            .saveState(true)
            .pollTimeout(Duration.ofSeconds(10))
            .topic("")
            .partitionOffsets(partitionOffset)
            .build();
}

@Bean
public Step kafkaStep(StepBuilderFactory stepBuilderFactory,ItemWriter testItemWriter,KafkaItemReader kafkaItemReader) throws Exception {
    return stepBuilderFactory.get("kafkaStep")
            .chunk(10)
            .reader(kafkaItemReader)
            .writer(testItemWriter)
            .build();
}

@Bean
public Job kafkaJob(Step kafkaStep,JobBuilderFactory jobBuilderFactory) throws Exception {
    return jobBuilderFactory.get("kafkaJob").incrementer(new RunIdIncrementer())
            .start(kafkaStep)
            .build();
}

我是否缺少导致此行为的某些配置?如果我停止并重新 运行 应用程序,我看不到这种行为,在这种情况下它会正确选择偏移量。

您 运行 在每个 shcedule 上创建一个新的作业实例(通过使用不同的日期作为识别作业参数),但是您的 reader 是一个单例 bean。这意味着它将被每个 运行 重复使用,而无需使用正确的偏移量重新初始化。您可以使其步进范围内为每个 运行:

拥有一个 reader 的新实例
@Bean
@StepScope
public KafkaItemReader<String, String> kafkaItemReader() {
   ...
}

这会给您带来与重新启动应用程序相同的行为,您说这可以解决问题。