Spring 批处理 - 将项目分类为不同的项目类型
Spring Batch - Classify items into different item types
参考link:,其实我想把物品归类到不同的物品类型。
这里我想根据不同的条件将item分为Customer
Table和NewCustomer
table,这样我就可以把数据保存到两个不同的tables.
这是我的示例代码,它似乎运行不佳。
ClassifierCompositeItemApplication.java
@EnableBatchProcessing
@SpringBootApplication
public class ClassifierCompositeItemApplication {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
@Value("classpath:input/customer.csv")
private Resource inputResource;
public ClassifierCompositeItemApplication(JobBuilderFactory jobs, StepBuilderFactory steps) {
this.jobBuilderFactory = jobs;
this.stepBuilderFactory = steps;
}
@Bean
@StepScope
public FlatFileItemReader<Customer> classifierCompositeWriterItemReader() {
return new FlatFileItemReaderBuilder<Customer>()
.name("customerFileReader")
.resource(inputResource)
.delimited()
.names(new String[] { "firstName", "middleInitial", "lastName", "address", "city", "state", "zip" })
.targetType(Customer.class)
.build();
}
@Bean
public ClassifierCompositeItemWriter<Customer> compositeItemWriter() throws IOException {
final Classifier<Customer, ItemWriter<? super Customer>> classifier = new CustomerClassifier(
this.customer1(), this.customer2());
return new ClassifierCompositeItemWriterBuilder<Customer>()
.classifier(classifier)
.build();
}
@Bean
@StepScope
public ItemStreamWriter<Customer> customer1() throws IOException {
System.out.println("Customer #1");
return new ItemStreamWriter<Customer>() {
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
}
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
}
@Override
public void close() throws ItemStreamException {
}
@Override
public void write(List<? extends Customer> items) throws Exception {
for (Customer customer : items) {
System.out.println(customer);
}
}
};
}
@Bean
public ItemStreamWriter<Customer> customer2() {
System.out.println("Customer #2");
return new ItemStreamWriter<Customer>() {
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
}
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
}
@Override
public void close() throws ItemStreamException {
}
@Override
public void write(List<? extends Customer> items) throws Exception {
for (Customer customer : items) {
System.out.println(customer);
}
}
};
}
@Bean
public Step classifierCompositeWriterStep() throws IOException {
return this.stepBuilderFactory.get("compositeWriterStep")
.<Customer, Customer>chunk(10)
.reader(this.classifierCompositeWriterItemReader())
.writer(this.compositeItemWriter())
.stream(this.customer1())
.stream(this.customer2())
.build();
}
@Bean
public Job classifierCompositeWriterJob() throws IOException {
return this.jobBuilderFactory.get("compositeWriterJob")
.start(this.classifierCompositeWriterStep())
.build();
}
public static void main(String[] args) {
SpringApplication.run(ClassifierCompositeItemApplication.class, args);
}
}
CustomerClassifier.java
@Data
@AllArgsConstructor
@NoArgsConstructor
public class CustomerClassifier implements Classifier<Customer, ItemWriter<? super Customer>> {
private static final long serialVersionUID = 1L;
private ItemWriter<Customer> fileItemWriter;
private ItemWriter<Customer> jdbcItemWriter;
@Override
public ItemWriter<? super Customer> classify(Customer customer) {
if (customer.getState().matches("^[A-M].*")) {
return fileItemWriter;
} else {
return jdbcItemWriter;
}
}
}
Customer.java
public class Customer implements Serializable {
private String firstName;
private String middleInitial;
private String lastName;
....
....
//
}
新建Customer.java
public class NewCustomer implements Serializable {
private String firstName;
private String middleInitial;
private String lastName;
....
....
// All different fields
}
From Reader, say reader has 10 fields, 5 fields will go into Customer table and other 5 fields will go into NewCustomer table
在那种情况下,您不需要这个 NewCustomer
对象和这个分类器。一个复合写入器有两个 jdbc 写入器就足够了:一个确实插入到 customer
字段 1,.. 字段 5,另一个确实插入到 new_customer
字段 6,... 字段 10。字段 1到 10 来自 Customer
您的 reader.
返回的对象
Great, Thanks, Could you please show some code ?
首先,创建两个 jdbc 写入器,每个写入器应插入给定的 table:
@Bean
public JdbcBatchItemWriter<Customer> writer1(DataSource dataSource) {
return new JdbcBatchItemWriterBuilder<Customer>()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("INSERT INTO CUSTOMER (field1, field2) VALUES (:field1, :field2)")
.dataSource(dataSource)
.build();
}
@Bean
public JdbcBatchItemWriter<Customer> writer2(DataSource dataSource) {
return new JdbcBatchItemWriterBuilder<Customer>()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("INSERT INTO NEW_CUSTOMER (field3, field4) VALUES (:field3, :field4)")
.dataSource(dataSource)
.build();
}
然后将它们组合成复合编写器:
@Bean
public CompositeItemWriter<Customer> compositeItemWriter(DataSource dataSource) {
return new CompositeItemWriterBuilder<Customer>()
.delegates(Arrays.asList(writer1(dataSource), writer2(dataSource)))
.build();
}
显然,您的 reader 返回的 Customer
对象应该有 field1
.. field4
.
参考link:
这里我想根据不同的条件将item分为Customer
Table和NewCustomer
table,这样我就可以把数据保存到两个不同的tables.
这是我的示例代码,它似乎运行不佳。
ClassifierCompositeItemApplication.java
@EnableBatchProcessing
@SpringBootApplication
public class ClassifierCompositeItemApplication {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
@Value("classpath:input/customer.csv")
private Resource inputResource;
public ClassifierCompositeItemApplication(JobBuilderFactory jobs, StepBuilderFactory steps) {
this.jobBuilderFactory = jobs;
this.stepBuilderFactory = steps;
}
@Bean
@StepScope
public FlatFileItemReader<Customer> classifierCompositeWriterItemReader() {
return new FlatFileItemReaderBuilder<Customer>()
.name("customerFileReader")
.resource(inputResource)
.delimited()
.names(new String[] { "firstName", "middleInitial", "lastName", "address", "city", "state", "zip" })
.targetType(Customer.class)
.build();
}
@Bean
public ClassifierCompositeItemWriter<Customer> compositeItemWriter() throws IOException {
final Classifier<Customer, ItemWriter<? super Customer>> classifier = new CustomerClassifier(
this.customer1(), this.customer2());
return new ClassifierCompositeItemWriterBuilder<Customer>()
.classifier(classifier)
.build();
}
@Bean
@StepScope
public ItemStreamWriter<Customer> customer1() throws IOException {
System.out.println("Customer #1");
return new ItemStreamWriter<Customer>() {
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
}
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
}
@Override
public void close() throws ItemStreamException {
}
@Override
public void write(List<? extends Customer> items) throws Exception {
for (Customer customer : items) {
System.out.println(customer);
}
}
};
}
@Bean
public ItemStreamWriter<Customer> customer2() {
System.out.println("Customer #2");
return new ItemStreamWriter<Customer>() {
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
}
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
}
@Override
public void close() throws ItemStreamException {
}
@Override
public void write(List<? extends Customer> items) throws Exception {
for (Customer customer : items) {
System.out.println(customer);
}
}
};
}
@Bean
public Step classifierCompositeWriterStep() throws IOException {
return this.stepBuilderFactory.get("compositeWriterStep")
.<Customer, Customer>chunk(10)
.reader(this.classifierCompositeWriterItemReader())
.writer(this.compositeItemWriter())
.stream(this.customer1())
.stream(this.customer2())
.build();
}
@Bean
public Job classifierCompositeWriterJob() throws IOException {
return this.jobBuilderFactory.get("compositeWriterJob")
.start(this.classifierCompositeWriterStep())
.build();
}
public static void main(String[] args) {
SpringApplication.run(ClassifierCompositeItemApplication.class, args);
}
}
CustomerClassifier.java
@Data
@AllArgsConstructor
@NoArgsConstructor
public class CustomerClassifier implements Classifier<Customer, ItemWriter<? super Customer>> {
private static final long serialVersionUID = 1L;
private ItemWriter<Customer> fileItemWriter;
private ItemWriter<Customer> jdbcItemWriter;
@Override
public ItemWriter<? super Customer> classify(Customer customer) {
if (customer.getState().matches("^[A-M].*")) {
return fileItemWriter;
} else {
return jdbcItemWriter;
}
}
}
Customer.java
public class Customer implements Serializable {
private String firstName;
private String middleInitial;
private String lastName;
....
....
//
}
新建Customer.java
public class NewCustomer implements Serializable {
private String firstName;
private String middleInitial;
private String lastName;
....
....
// All different fields
}
From Reader, say reader has 10 fields, 5 fields will go into Customer table and other 5 fields will go into NewCustomer table
在那种情况下,您不需要这个 NewCustomer
对象和这个分类器。一个复合写入器有两个 jdbc 写入器就足够了:一个确实插入到 customer
字段 1,.. 字段 5,另一个确实插入到 new_customer
字段 6,... 字段 10。字段 1到 10 来自 Customer
您的 reader.
Great, Thanks, Could you please show some code ?
首先,创建两个 jdbc 写入器,每个写入器应插入给定的 table:
@Bean
public JdbcBatchItemWriter<Customer> writer1(DataSource dataSource) {
return new JdbcBatchItemWriterBuilder<Customer>()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("INSERT INTO CUSTOMER (field1, field2) VALUES (:field1, :field2)")
.dataSource(dataSource)
.build();
}
@Bean
public JdbcBatchItemWriter<Customer> writer2(DataSource dataSource) {
return new JdbcBatchItemWriterBuilder<Customer>()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("INSERT INTO NEW_CUSTOMER (field3, field4) VALUES (:field3, :field4)")
.dataSource(dataSource)
.build();
}
然后将它们组合成复合编写器:
@Bean
public CompositeItemWriter<Customer> compositeItemWriter(DataSource dataSource) {
return new CompositeItemWriterBuilder<Customer>()
.delegates(Arrays.asList(writer1(dataSource), writer2(dataSource)))
.build();
}
显然,您的 reader 返回的 Customer
对象应该有 field1
.. field4
.