Master/slave 使用模块化作业方法在 spring 批处理中进行作业架构设计

Master/slave Job architectural design in spring batch using modular job approach

希望你一切顺利。 我在 spring 批次中遇到设计问题。

让我解释一下:

我有一个模块化的 spring 批处理作业架构, 每个作业都有自己的配置文件和上下文。 我正在设计一个主作业来启动子作业(50 多种子作业)。

X obj 具有其他名称、状态和包含附加到它的 csv 文件的 blob。 X obj将在处理后更新。 我按照第一种方法获取所有 X obj,然后循环(在 java 流中)调用适当的作业。

但是这种方法有很多局限性。

所以我设计了一个带有 reader 处理器和写入器的 masterJob。

MasterJob 应该读取 X obj 并调用适当的子作业并更新 X obj 的状态。

masterJobReader,它调用自定义服务来获取比方说 X obj 的列表。

我开始尝试从 masterJob 处理器中启动子作业,但没有成功。

我做了一些研究,发现 JobStep 更适合这种情况。

但是我对如何将 masterJobReader 读取的项目传递给 JobStep 有参数感到困惑。

我确实看到了 DefaultJobParameterExtractor,我尝试将 Item 读入 stepExecutionContext 但它不起作用。

My question how to pass parameter from MasterJob to SubJob using JobStep approach?

如果有更好的方法来处理这个问题,那我就是你的了!

我正在使用 Java 配置和 spring 批处理 4.3。

编辑以提供示例代码:

@Configuration
public class MasterJob {

@Value("${defaultCompletionPolicy}")
private Integer defaultCompletionPolicy;

@Autowired
protected StepBuilderFactory masterStepBuilderFactory;

private Logger logger = LoggerFactory.getLogger(MasterJob.class);

@Autowired
protected JobRepository jobRepo;

@Autowired
protected PlatformTransactionManager transactionManager;

@Autowired
@Qualifier("JOB_NAME1")
private Job JOB_NAME1; // this should change to be dynamic as there are around 50 types of job

@Bean(name = "masterJob")
protected Job masterBatchJob() throws ApiException {

    return new JobBuilderFactory(jobRepo).get("masterJob")
            .incrementer(new RunIdIncrementer())
            .start(masterJobStep(masterJobReader(), masterJobWriter()))
            .next(jobStepJobStep1(null))
            .next(masterUpdateStep()) // update the state of objX
            .build();
}



@Bean(name = "masterJobStep")
protected Step masterJobStep(@Qualifier("masterJobReader") MasterJobReader masterReader,
        @Qualifier("masterJobWriter") MasterJobWriter masterWriter) throws ApiException {
    
    logger.debug("inside   masterJobStep");

    return this.masterStepBuilderFactory.get("masterJobStep")

            .<Customer, Customer>chunk(defaultCompletionPolicy)
            .reader(masterJobReader())
            .processor(masterJobProcessor())
            .writer(masterJobWriter())
            .transactionManager(transactionManager)             
            .listener(new MasterJobWriter()) // I set the parameter inside this.
            .listener(masterPromotionListener())
            .build();
}

@Bean(name = "masterJobWriter", destroyMethod = "")
@StepScope
protected MasterJobWriter masterJobWriter() {
    return new MasterJobWriter();
}

@Bean(name = "masterJobReader", destroyMethod = "")
@StepScope
protected MasterJobReader masterJobReader() throws ApiException {
    return new MasterJobReader();
}


protected FieldSetMapper<Customer> mapper() {
    return new CustomerMapper();
}

@Bean(name="masterPromotionListener")
public ExecutionContextPromotionListener masterPromotionListener() {
    ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
    
    listener.setKeys(
             
            new String[]                        
                    {   
                        "inputFile", 
                        "outputFile", 
                        "customerId",                           
                        "comments", 
                        "customer" 
                    });
    
    //listener.setStrict(true);
    return listener;
}



@Bean(name = "masterUpdateStep")
public Step masterUpdateStep() {

    return this.masterStepBuilderFactory.get("masterCleanStep").tasklet(new MasterUpdateTasklet()).build();
}



@Bean(name = "masterJobProcessor", destroyMethod = "")
@StepScope
protected MasterJobProcessor masterJobProcessor() {
    return new MasterJobProcessor();
}


@Bean
public Step jobStepJobStep1(JobLauncher jobLauncher) {
    return this.masterStepBuilderFactory.get("jobStepJobStep1")
                .job(JOB_NAME1)
                .launcher(jobLauncher)
                .parametersExtractor(jobParametersExtractor())
                .build();
}

@Bean
public DefaultJobParametersExtractor jobParametersExtractor() {
    DefaultJobParametersExtractor extractor = new DefaultJobParametersExtractor();

    extractor.setKeys(
            new String[] { "inputFile", "outputFile", , "customerId", "comments", "customer" });

    return extractor;
}
 }

这就是我在 MasterJobWriter:

中设置参数的方式
String inputFile = fetchInputFile(customer);
                String outputFile = buildOutputFileName(customer);
                
               Comments comments = "comments"; // from business logic
                
                ExecutionContext stepContext = this.stepExecution.getExecutionContext();
                stepContext.put("inputFile", inputFile);
                stepContext.put("outputFile", outputFile);                 
                stepContext.put("customerId", customer.getCustomerId());
                stepContext.put("comments", new CustomJobParameter<Comments>(comments));
                stepContext.put("customer", new CustomJobParameter<Customer>(customer));

我关注 spring 批次的 this section of the documentation

My question how to pass parameter from MasterJob to SubJob using JobStep approach?

JobParametersExtractor is what you are looking for. It allows you to extract parameters from the main job and pass them to the subjob. You can find an example here.

编辑:根据评论添加建议

I have a list of X obj in the DB. X obj has among other fields, id, type(of work), name, state and blob which contains the csv file attached to it. The blob field containing the csv file depends on the type field so it's not one pattern csv file. I need to process each X obj and save the content of the csv file in the DB and generate a csv result file containing the original data plus a comment field in the result csv file and update X obj state with the result csv field attached to X obj and other fields.

如您所见,对于单个 X 对象来说,这个过程已经很复杂了。因此,恕我直言,尝试在同一作业中处理所有 X 对象太复杂了。 So much complexity in software comes from trying to make one thing do two things..

If there is better way to deal with this then I'm all yours!

既然你是开放的建议,我会推荐两个选项:

选项 1:

如果由我决定,我会为每个 X obj 创建一个作业实例。这样,我可以 1) 并行化 2) 在失败的情况下,只重新启动失败的作业。这两个特性(可扩展性和可重启性)几乎不可能通过作业的作业方法实现。即使您有很多 X 对象,这也不是问题。您可以使用 Spring Batch 提供的一种缩放技术来并行处理事物。

选项 2:

如果您真的不能或不想使用不同的作业实例,您可以使用一个作业,其中包含一个面向块的步骤,该步骤迭代 X 个对象列表。从一条记录到另一条记录的处理逻辑似乎是独立的,因此这一步应该可以很容易地通过多线程进行扩展。