Spring 批处理 - 将项目分类为不同的项目类型

Spring Batch - Classify items into different item types

参考link:,其实我想把物品归类到不同的物品类型。

这里我想根据不同的条件将item分为CustomerTable和NewCustomertable,这样我就可以把数据保存到两个不同的tables.

这是我的示例代码,它似乎运行不佳。

ClassifierCompositeItemApplication.java

@EnableBatchProcessing
@SpringBootApplication
public class ClassifierCompositeItemApplication {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Value("classpath:input/customer.csv")
    private Resource inputResource;

    public ClassifierCompositeItemApplication(JobBuilderFactory jobs, StepBuilderFactory steps) {
        this.jobBuilderFactory = jobs;
        this.stepBuilderFactory = steps;
    }

    @Bean
    @StepScope
    public FlatFileItemReader<Customer> classifierCompositeWriterItemReader() {
        return new FlatFileItemReaderBuilder<Customer>()
                .name("customerFileReader")
                .resource(inputResource)
                .delimited()
                .names(new String[] { "firstName", "middleInitial", "lastName", "address", "city", "state", "zip" })
                .targetType(Customer.class)
                .build();
    }

    @Bean
    public ClassifierCompositeItemWriter<Customer> compositeItemWriter() throws IOException {
        final Classifier<Customer, ItemWriter<? super Customer>> classifier = new CustomerClassifier(
                this.customer1(), this.customer2());

        return new ClassifierCompositeItemWriterBuilder<Customer>()
                .classifier(classifier)
                .build();
    }

    @Bean
    @StepScope
    public ItemStreamWriter<Customer> customer1() throws IOException {
        System.out.println("Customer #1");
        return new ItemStreamWriter<Customer>() {

            @Override
            public void open(ExecutionContext executionContext) throws ItemStreamException {

            }

            @Override
            public void update(ExecutionContext executionContext) throws ItemStreamException {

            }

            @Override
            public void close() throws ItemStreamException {

            }

            @Override
            public void write(List<? extends Customer> items) throws Exception {
                for (Customer customer : items) {
                    System.out.println(customer);
                }
            }
        };
    }

    @Bean
    public ItemStreamWriter<Customer> customer2() {
        System.out.println("Customer #2");
        return new ItemStreamWriter<Customer>() {

            @Override
            public void open(ExecutionContext executionContext) throws ItemStreamException {

            }

            @Override
            public void update(ExecutionContext executionContext) throws ItemStreamException {

            }

            @Override
            public void close() throws ItemStreamException {

            }

            @Override
            public void write(List<? extends Customer> items) throws Exception {
                for (Customer customer : items) {
                    System.out.println(customer);
                }
            }
        };
    }


    @Bean
    public Step classifierCompositeWriterStep() throws IOException {
        return this.stepBuilderFactory.get("compositeWriterStep")
                .<Customer, Customer>chunk(10)
                .reader(this.classifierCompositeWriterItemReader())
                .writer(this.compositeItemWriter())
                .stream(this.customer1())
                .stream(this.customer2())
                .build();

    }


    @Bean
    public Job classifierCompositeWriterJob() throws IOException {
        return this.jobBuilderFactory.get("compositeWriterJob")
                .start(this.classifierCompositeWriterStep())
                .build();
    }


    public static void main(String[] args) {
        SpringApplication.run(ClassifierCompositeItemApplication.class, args);
    }
}

CustomerClassifier.java

@Data
@AllArgsConstructor
@NoArgsConstructor
public class CustomerClassifier implements Classifier<Customer, ItemWriter<? super Customer>> {
    private static final long serialVersionUID = 1L;

    private ItemWriter<Customer> fileItemWriter;
    private ItemWriter<Customer> jdbcItemWriter;

    @Override
    public ItemWriter<? super Customer> classify(Customer customer) {
        if (customer.getState().matches("^[A-M].*")) {
            return fileItemWriter;
        } else {
            return jdbcItemWriter;
        }
    }
}

Customer.java

public class Customer implements Serializable {

    private String firstName;
    private String middleInitial;
    private String lastName;
    ....
    ....
    //
}

新建Customer.java

public class NewCustomer implements Serializable {

    private String firstName;
    private String middleInitial;
    private String lastName;
    ....
    ....
    // All different fields
}

From Reader, say reader has 10 fields, 5 fields will go into Customer table and other 5 fields will go into NewCustomer table

在那种情况下,您不需要这个 NewCustomer 对象和这个分类器。一个复合写入器有两个 jdbc 写入器就足够了:一个确实插入到 customer 字段 1,.. 字段 5,另一个确实插入到 new_customer 字段 6,... 字段 10。字段 1到 10 来自 Customer 您的 reader.

返回的对象

Great, Thanks, Could you please show some code ?

首先,创建两个 jdbc 写入器,每个写入器应插入给定的 table:

@Bean
public JdbcBatchItemWriter<Customer> writer1(DataSource dataSource) {
    return new JdbcBatchItemWriterBuilder<Customer>()
            .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
            .sql("INSERT INTO CUSTOMER (field1, field2) VALUES (:field1, :field2)")
            .dataSource(dataSource)
            .build();
}

@Bean
public JdbcBatchItemWriter<Customer> writer2(DataSource dataSource) {
    return new JdbcBatchItemWriterBuilder<Customer>()
            .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
            .sql("INSERT INTO NEW_CUSTOMER (field3, field4) VALUES (:field3, :field4)")
            .dataSource(dataSource)
            .build();
}

然后将它们组合成复合编写器:

@Bean
public CompositeItemWriter<Customer> compositeItemWriter(DataSource dataSource) {
    return new CompositeItemWriterBuilder<Customer>()
            .delegates(Arrays.asList(writer1(dataSource), writer2(dataSource)))
            .build();
}

显然,您的 reader 返回的 Customer 对象应该有 field1 .. field4.