使用 spring 批处理处理大文件

Processing a large file using spring batch

我有一个大文件,可能包含 100K 到 500K 条记录。我打算使用面向块的处理,我的想法是

1) 根据计数将大文件拆分为较小的文件,假设每个文件中有 10K。

2) 如果有 100K 条记录,那么我将得到 10 个文件,每个文件包含 10K 个 reocrds

3) 我想对这 10 个文件进行分区,并希望使用 5 个线程进行处理。我正在考虑使用自定义 MultiResourcePartioner

4) 5 个线程应该处理在拆分过程中创建的所有 10 个文件。

5) 我不想创建与文件数相同的线程数,因为在这种情况下我可能会遇到内存问题。我正在寻找的是我想只使用 5 个线程处理它们的文件数量(我可以根据我的要求增加)。

高手能否告诉我这可以使用 spring 批处理来实现?如果是,请分享指针或参考实现

提前致谢

工作作业配置xml

<description>Spring Batch File Chunk Processing</description>

<import resource="../config/batch-context.xml" />

<batch:job id="file-partition-batch" job-repository="jobRepository" restartable="false">        
    <batch:step id="master">
        <batch:partition partitioner="partitioner" handler="partitionHandler" />
    </batch:step>
</batch:job>

<batch:step id="slave">
    <batch:tasklet>
        <batch:chunk reader="reader" processor="compositeProcessor"
            writer="compositeWriter" commit-interval="5">
        </batch:chunk>
    </batch:tasklet>
</batch:step>

<bean id="partitionHandler" class="org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler">
    <property name="taskExecutor" ref="taskExecutor"/>
    <property name="step" ref="slave" />
    <property name="gridSize" value="5" />
</bean>

<bean id="partitioner" class="com.poc.partitioner.FileMultiResourcePartitioner">
    <property name="resources" value="file:/Users/anupghosh/Documents/Spring_Batch/FilePartitionBatch/*.txt" />
    <property name="threadName" value="feed-processor" />
</bean>

<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
    <property name="corePoolSize" value="5" />
    <property name="maxPoolSize" value="5" />
</bean>

<bean id="reader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">
    <property name="resource" value="#{stepExecutionContext['fileName']}" />

    <property name="lineMapper">
        <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
        <property name="lineTokenizer">
            <bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
                <property name="delimiter" value="|"/>
                <property name="names" value="key,docName,docTypCD,itemType,itemNum,launchDate,status" />
            </bean>
        </property>
        <property name="fieldSetMapper">
            <bean class="com.poc.mapper.FileRowMapper" />
        </property>
        </bean>
    </property>
</bean>

<bean id="validatingProcessor" class="org.springframework.batch.item.validator.ValidatingItemProcessor">
    <constructor-arg ref="feedRowValidator" />
</bean>

<bean id="feedProcesor" class="com.poc.processor.FeedProcessor" />

<bean id="compositeProcessor" class="org.springframework.batch.item.support.CompositeItemProcessor" scope="step">
    <property name="delegates">
        <list>
            <ref bean="validatingProcessor" />
            <ref bean="feedProcesor" />
        </list>
    </property>
</bean>

<bean id="recordDecWriter" class="com.poc.writer.RecordDecWriter" />

<bean id="reconFlatFileCustomWriter" class="com.poc.writer.ReconFileWriter">
    <property name="reconFlatFileWriter" ref="reconFlatFileWriter" />
</bean>

<bean id="reconFlatFileWriter" class="org.springframework.batch.item.file.FlatFileItemWriter" scope="step">
    <property name="resource" value="file:/Users/anupghosh/Documents/Spring_Batch/recon-#{stepExecutionContext[threadName]}.txt" />
    <property name="shouldDeleteIfExists" value="true" />
    <property name="lineAggregator">
        <bean class="org.springframework.batch.item.file.transform.DelimitedLineAggregator">
            <property name="delimiter" value="|" />
            <property name="fieldExtractor">
                <bean class="org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor">
                    <property name="names" value="validationError" />
                </bean>
            </property>
        </bean>
    </property>
</bean>

<bean id="compositeWriter" class="org.springframework.batch.item.support.CompositeItemWriter">
    <property name="delegates">
        <list>
            <ref bean="recordDecWriter" />
            <ref bean="reconFlatFileCustomWriter" />
        </list>
    </property>
</bean>

<bean id="feedRowValidator" class="org.springframework.batch.item.validator.SpringValidator">
    <property name="validator">
        <bean class="com.poc.validator.FeedRowValidator"/>
    </property>
</bean>     

能够使用 MultiResourcePartitioner 解决这个问题。下面是 java 配置

@Bean
    public Partitioner partitioner() {
        MultiResourcePartitioner partitioner = new MultiResourcePartitioner();
        ClassLoader cl = this.getClass().getClassLoader();
        ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver(cl);
        Resource[] resources = resolver.getResources("file:" + filePath + "/"+"*.csv");     
        partitioner.setResources(resources);
        partitioner.partition(10);      
        return partitioner;
    }

    @Bean
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setMaxPoolSize(4);
        taskExecutor.afterPropertiesSet();
        return taskExecutor;
    }   

    @Bean
    @Qualifier("masterStep")
    public Step masterStep() {
        return stepBuilderFactory.get("masterStep")
                .partitioner(ProcessDataStep())
                .partitioner("ProcessDataStep",partitioner())   
                .taskExecutor(taskExecutor())
                .listener(pcStressStepListener)
                .build();
    }


    @Bean
    @Qualifier("processData")
    public Step processData() {
        return stepBuilderFactory.get("processData")
                .<pojo, pojo> chunk(5000)
                .reader(reader)             
                .processor(processor())
                .writer(writer)         
                .build();
    }



    @Bean(name="reader")
    @StepScope
    public FlatFileItemReader<pojo> reader(@Value("#{stepExecutionContext['fileName']}") String filename) {

        FlatFileItemReader<pojo> reader = new FlatFileItemReader<>();
        reader.setResource(new UrlResource(filename));
        reader.setLineMapper(new DefaultLineMapper<pojo>() {
            {
                setLineTokenizer(new DelimitedLineTokenizer() {
                    {
                        setNames(FILE HEADER);


                    }
                });
                setFieldSetMapper(new BeanWrapperFieldSetMapper<pojo>() {
                    {
                        setTargetType(pojo.class);
                    }
                });
            }
        });
        return reader;
    }