Spring 批处理分区不起作用 compositeitemprocessor
Spring Batch partition doesnt work composite itemprocessor
我有一个 Spring 批处理分区作业。我正在使用 CompositeProcessor
,从数据库中读取数据并将这些项目保存到 CopyOnWriteArrayList
中。因为环境是并发的,但是我的 CopyOnWriteArrayList
正在被其他线程使用并混合信息,我不知道为什么以及我做错了什么,输出将它们写入每个线程的文件中。
public class CustomerItemProcessor implements ItemProcessor<beangenerico,CopyOnWriteArrayList<beanCustomer>> {
private CustomerDAO customerDAO;
private CopyOnWriteArrayList<beanCustomer> listbean;
public CopyOnWriteArrayList<beanCustomer> process(beangenerico rangos) throws Exception {
listbean = customerDAO.getAccAgentes(rangos);
if(listbean != null) {
//customer.setId(currentCustomer.getId());
return listbean;
}else{
return null;
}
}
我的batch im的配置XML:
<batch:job id="partitionJob" xmlns="http://www.springframework.org/schema/batch">
<batch:step id="masterStep">
<batch:partition step="slave" partitioner="rangePartitioner">
<batch:handler grid-size="10" task-executor="taskExecutor"/>
</batch:partition>
</batch:step>
</batch:job>
<!-- each thread will run this job, with different stepExecutionContext values. -->
<batch:step id="slave" xmlns="http://www.springframework.org/schema/batch">
<batch:tasklet task-executor="taskExecutor" throttle-limit="1">
<batch:chunk reader="beaniniendreader" writer="tempRecordsWriter" processor="completeItemProcessor" commit-interval="1" />
</batch:tasklet>
</batch:step>
<bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor" />
<bean id="rangePartitioner" class="my.package.springbatch.RangePartitioner" />
<bean id="beaniniendreader" class="my.package.springbatch.FormiikReader" scope="step"></bean>
<bean id="beanprocessor" class="my.package.springbatch.FormiikProcessor" scope="step">
<property name="accountExecutiveDao" ref="accountExecutiveDao"/>
</bean>
<bean id="beanprocessor2" class="my.package.springbatch.CustomerItemProcessor" scope="step">
<property name="customerDAO" ref="customerAccDao"/>
</bean>
<bean id="completeItemProcessor" class="org.springframework.batch.item.support.CompositeItemProcessor">
<property name="delegates">
<list>
<ref bean="beanprocessor2"/>
<ref bean="accItemprocessor"/>
<ref bean="beanaccDataItem"/>
</list>
</property>
</bean>
<bean id="tempRecordsWriter" class="my.package.springbatch.ListDelegateWriter" scope="step">
<property name="delegate" ref="flatFileItemWriterPartition"/>
</bean>
<!-- csv file writer -->
<bean id="flatFileItemWriterPartition" class="org.springframework.batch.item.file.FlatFileItemWriter"
scope="step" >
<property name="resource"
value="file:csv/outputs/users.processed#{stepExecutionContext[fromId]}-#{stepExecutionContext[toId]}.csv" />
<property name="appendAllowed" value="false" />
<property name="lineAggregator">
<bean class="org.springframework.batch.item.file.transform.DelimitedLineAggregator">
<property name="delimiter" value="," />
<property name="fieldExtractor">
<bean class="org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor">
<property name="names" value="cuenta, name, purchasedPackage" />
</bean>
</property>
</bean>
</property>
</bean>
我回到我的代码的主题,建议我使用 Threadlocal 来存储线程特定的数据,从而使它起作用。在这里,我再次放上我的代码。感谢您的回复。
public class CustomerItemProcessor implements ItemProcessor<beangenerico,ThreadLocal<CopyOnWriteArrayList<beanCustomer>>> {
private CustomerDAO customerDAO;
private ThreadLocal<CopyOnWriteArrayList<beanCustomer>> listbean = new ThreadLocal<CopyOnWriteArrayList<beanCustomer>>();
public ThreadLocal<CopyOnWriteArrayList<beanCustomer>> process(beangenerico rangos) throws Exception {
listbean.set(new CopyOnWriteArrayList<beanCustomer>());
listbean = customerDAO.getAccAgentes(rangos);
if(listbean != null) {
return listbean;
} else {
return null;
}
}
public void setCustomerDAO(CustomerDAO customerDAO) {
this.customerDAO = customerDAO;
}
}
我有一个 Spring 批处理分区作业。我正在使用 CompositeProcessor
,从数据库中读取数据并将这些项目保存到 CopyOnWriteArrayList
中。因为环境是并发的,但是我的 CopyOnWriteArrayList
正在被其他线程使用并混合信息,我不知道为什么以及我做错了什么,输出将它们写入每个线程的文件中。
public class CustomerItemProcessor implements ItemProcessor<beangenerico,CopyOnWriteArrayList<beanCustomer>> {
private CustomerDAO customerDAO;
private CopyOnWriteArrayList<beanCustomer> listbean;
public CopyOnWriteArrayList<beanCustomer> process(beangenerico rangos) throws Exception {
listbean = customerDAO.getAccAgentes(rangos);
if(listbean != null) {
//customer.setId(currentCustomer.getId());
return listbean;
}else{
return null;
}
}
我的batch im的配置XML:
<batch:job id="partitionJob" xmlns="http://www.springframework.org/schema/batch">
<batch:step id="masterStep">
<batch:partition step="slave" partitioner="rangePartitioner">
<batch:handler grid-size="10" task-executor="taskExecutor"/>
</batch:partition>
</batch:step>
</batch:job>
<!-- each thread will run this job, with different stepExecutionContext values. -->
<batch:step id="slave" xmlns="http://www.springframework.org/schema/batch">
<batch:tasklet task-executor="taskExecutor" throttle-limit="1">
<batch:chunk reader="beaniniendreader" writer="tempRecordsWriter" processor="completeItemProcessor" commit-interval="1" />
</batch:tasklet>
</batch:step>
<bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor" />
<bean id="rangePartitioner" class="my.package.springbatch.RangePartitioner" />
<bean id="beaniniendreader" class="my.package.springbatch.FormiikReader" scope="step"></bean>
<bean id="beanprocessor" class="my.package.springbatch.FormiikProcessor" scope="step">
<property name="accountExecutiveDao" ref="accountExecutiveDao"/>
</bean>
<bean id="beanprocessor2" class="my.package.springbatch.CustomerItemProcessor" scope="step">
<property name="customerDAO" ref="customerAccDao"/>
</bean>
<bean id="completeItemProcessor" class="org.springframework.batch.item.support.CompositeItemProcessor">
<property name="delegates">
<list>
<ref bean="beanprocessor2"/>
<ref bean="accItemprocessor"/>
<ref bean="beanaccDataItem"/>
</list>
</property>
</bean>
<bean id="tempRecordsWriter" class="my.package.springbatch.ListDelegateWriter" scope="step">
<property name="delegate" ref="flatFileItemWriterPartition"/>
</bean>
<!-- csv file writer -->
<bean id="flatFileItemWriterPartition" class="org.springframework.batch.item.file.FlatFileItemWriter"
scope="step" >
<property name="resource"
value="file:csv/outputs/users.processed#{stepExecutionContext[fromId]}-#{stepExecutionContext[toId]}.csv" />
<property name="appendAllowed" value="false" />
<property name="lineAggregator">
<bean class="org.springframework.batch.item.file.transform.DelimitedLineAggregator">
<property name="delimiter" value="," />
<property name="fieldExtractor">
<bean class="org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor">
<property name="names" value="cuenta, name, purchasedPackage" />
</bean>
</property>
</bean>
</property>
</bean>
我回到我的代码的主题,建议我使用 Threadlocal 来存储线程特定的数据,从而使它起作用。在这里,我再次放上我的代码。感谢您的回复。
public class CustomerItemProcessor implements ItemProcessor<beangenerico,ThreadLocal<CopyOnWriteArrayList<beanCustomer>>> {
private CustomerDAO customerDAO;
private ThreadLocal<CopyOnWriteArrayList<beanCustomer>> listbean = new ThreadLocal<CopyOnWriteArrayList<beanCustomer>>();
public ThreadLocal<CopyOnWriteArrayList<beanCustomer>> process(beangenerico rangos) throws Exception {
listbean.set(new CopyOnWriteArrayList<beanCustomer>());
listbean = customerDAO.getAccAgentes(rangos);
if(listbean != null) {
return listbean;
} else {
return null;
}
}
public void setCustomerDAO(CustomerDAO customerDAO) {
this.customerDAO = customerDAO;
}
}