Spring 批量持久化step1到step2的数据

Spring Batch persist data from step1 to step2

我已经彻底搜索了 Spring 文档和支持站点,但没有找到并回答这个问题;如果我想在 ExecutionContext 中访问和存储一些值,我是否必须编写实现 ItemStream 的自定义 databaseItemReader 和 ItemWriter,或者我可以使用 "out-of-the-box" 读取器和写入器并编辑 spring 中的 bean -batch-context.xml 文件来做到这一点?任何代码示例将不胜感激。谢谢!

http://www.springframework.org/schema/batch/spring-batch-3.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd">

<import resource="classpath:context-datasource.xml" />

<!-- JobRepository and JobLauncher are configuration/setup classes -->
<bean id="jobRepository"
    class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean" />

<bean id="jobLauncher"
    class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
    <property name="jobRepository" ref="jobRepository" />
</bean>


<!-- ItemReader which reads from database and returns the row mapped by 
    rowMapper -->
<bean id="databaseItemReader"
    class="org.springframework.batch.item.database.JdbcCursorItemReader">

    <property name="dataSource" ref="dataSource" />

    <property name="sql"
        value="SELECT PartnerID, ftpUserName, ftpPassword, ftpPath, jobRunTime, jobFrequency FROM tblRosterJobParams" />

    <property name="rowMapper">
        <bean class="com.explorelearning.batch.ParamResultRowMapper" />
    </property>

</bean>


<!-- This was supposed to change to a SavingItemWriter that persists these values to the Step ExecutionContext -->
<bean id="flatFileItemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter"
    scope="step">

    <property name="resource" value="file:csv/ParamResult.txt" />

    <property name="lineAggregator">

        <!--An Aggregator which converts an object into delimited list of strings -->
        <bean
            class="org.springframework.batch.item.file.transform.DelimitedLineAggregator">

            <property name="delimiter" value="," />

            <property name="fieldExtractor">

                <!-- Extractor which returns the value of beans property through reflection -->
                <bean
                    class="org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor">
                    <property name="names" value="PartnerID" />
                </bean>
            </property>
        </bean>
    </property>
</bean>

<!-- Optional JobExecutionListener to perform business logic before and after the job -->
<bean id="jobListener" class="com.explorelearning.batch.RosterBatchJobListener" />

<!-- Optional StepExecutionListener to perform business logic before and after the job -->
<bean id="stepExecutionListener" class="com.explorelearning.batch.ParamResultStepExecutionListener" />

<!-- Optional ItemProcessor to perform business logic/filtering on the input records -->
<bean id="itemProcessor" class="com.explorelearning.batch.ParamResultItemProcessor" />

<!-- Step will need a transaction manager -->
<bean id="transactionManager"
    class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />

<!-- Actual Job -->
<batch:job-repository id="jobRepository"  data-source="dataSource" table-prefix="BATCH_"
    transaction-manager="transactionManager" isolation-level-for-create="SERIALIZABLE" />
<batch:job id="RosterBatchJob" job-repository="jobRepository">
 <batch:step id="readParams" >
    <batch:tasklet transaction-manager="transactionManager" allow-start-if-complete="true">
        <batch:chunk reader="databaseItemReader"  writer="flatFileItemWriter"
            processor="itemProcessor" commit-interval="10" />
    </batch:tasklet>
 </batch:step>
 <!--<batch:step id="grabCSVs" next="validateCSVs">

 </batch:step>
 <batch:step id="validateCSVs" next="filterRecords">

 </step>
 <batch:step id="filterRecords" next="determineActions">

 </batch:step>
  <batch:step id="determineActions" next="executeActions">

 </batch:step>
  <batch:step id="executeActions" next="">

 </batch:step>  -->

</batch:job> 

已更新,问题更详细...

好的,按照我读取上下文文件的方式,您需要:

  1. 从数据库中获取一些 FTP 登录信息
  2. 下载一些 CSV 文件
  3. 验证文件(文件级?还是记录级验证?)
  4. 过滤掉一些垃圾记录
  5. 确定一些"actions"
  6. 执行一些"actions"

完成此任务的最佳方法(在我看来)是创建以下步骤:

  1. 第 1 步:一个简单的 Tasklet 查询数据库以获取 FTP 登录信息,然后将 CSV 文件下载到本地文件夹
  2. 第 2 步:分区步骤,为文件夹中的每个 CSV 创建一个分区
    1. 每个分区都会有一个 reader (FlatFileItemReader) 来读取记录。
    2. 如果记录是垃圾,记录将转到 ItemProcessor returns null
    3. 有效项目将被写入某些数据库分段 table 以供进一步操作
    4. 您可以选择使用 ClassifierItemWriter 来处理垃圾记录
  3. 第三步:下一步读取staging中的有效数据table并做"Actions"
  4. 第 4 步:也许是处理垃圾记录的另一步骤