Spring 批处理:如何将列表输出为 RepositoryItemReader 的输出

Spring batch: How to output list as output for RepositoryItemReader

您好,我有一个用例,我应该从数据库中读取数据并尝试合并行。例如,我有一个订单和许多 orderDetails。我的 SQL 查询的输出是 orderDetails。 OrderNum 是 orderDetails 中的 FK。因此,现在当我处理 orderDetails 时,我必须按 orderNum 进行过滤并使用外部端点。在我的例子中,我使用的是 RepositoryItemReader

MyReader.java

public RepositoryItemReader<List<POJO>> readCurrentSeasonOrder() {
// some code
}

MyProcessor.java

public POJO2 process(List<POJO> items) throws Exception {
// some code 
}

当我 运行 我的批处理作业出现异常时

POJO cannot be cast to java.util.List

我找不到适合我的用例的任何示例。非常感谢任何帮助。

“对象列表”不是 Spring 批处理的 chunk-oriented 处理模型中项目的良好封装。

选项 1:您可以更改查询以加入订单和订单详细信息以及 return 类型 Order 的项目。这样,一个项目将是 Order.

选项 2:另一种方法是使用 driving query pattern。这个想法是让你的 reader return Orders,并使用一个处理器来丰富订单的细节。这适用于 small/medium 数据集,但对于大型数据集表现不佳(因为每个项目的额外查询)。这与算法本身有关,与 Spring Batch 本身无关。

您更喜欢选项 1,其中数据库以高效的方式进行连接并为您提供延期交货的商品 pre-filled 及其详细信息。

编辑:为驾驶查询模式添加样本

以下示例展示了驱动查询模式的概念。 reader returns 项 Person 类型。处理器用他们的地址丰富个人项目。看看 reader 如何只获取人的 ID 和姓名,而不是地址。您可以根据您的情况将其调整为 OrderOrderDetails

/*
 * Copyright 2022 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *       http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.batch.sample;

import javax.sql.DataSource;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;

/**
 * @author Mahmoud Ben Hassine
 */
@Configuration
@EnableBatchProcessing
public class MyJob {

    @Bean
    public DataSource dataSource() {
        return new EmbeddedDatabaseBuilder()
                .setType(EmbeddedDatabaseType.H2)
                .addScript("/org/springframework/batch/core/schema-drop-h2.sql")
                .addScript("/org/springframework/batch/core/schema-h2.sql")
                .build();
    }

    @Bean
    public JdbcTemplate jdbcTemplate(DataSource dataSource) {
        return new JdbcTemplate(dataSource);
    }

    @Bean
    public JdbcCursorItemReader<Person> itemReader() {
        return new JdbcCursorItemReaderBuilder<Person>()
                .name("personItemReader")
                .dataSource(dataSource())
                .sql("select id, name from person")
                .beanRowMapper(Person.class)
                .build();
    }

    @Bean
    public ItemProcessor<Person, Person> itemProcessor() {
        return new ItemProcessor<Person, Person>() {
            @Autowired
            private JdbcTemplate jdbcTemplate;

            @Override
            public Person process(Person person) {
                Address address = jdbcTemplate.queryForObject("select * from address where personId = ?", new Object[]{person.getId()}, new BeanPropertyRowMapper<>(Address.class));
                person.setAddress(address);
                return person;
            }
        };
    }

    @Bean
    public ItemWriter<Person> itemWriter() {
        return items -> {
            for (Person item : items) {
                System.out.println("item = " + item);
            }
        };
    }
    
    @Bean
    public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) {
        return jobs.get("job")
                .start(steps.get("step")
                        .<Person, Person>chunk(2)
                        .reader(itemReader())
                        .processor(itemProcessor())
                        .writer(itemWriter())
                        .build())
                .build();
    }

    public static void main(String[] args) throws Exception {
        ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
        JdbcTemplate jdbcTemplate = context.getBean(JdbcTemplate.class);
        jdbcTemplate.update("CREATE TABLE address (id INT IDENTITY NOT NULL PRIMARY KEY, personId INT, street VARCHAR(20));");
        jdbcTemplate.update("CREATE TABLE person (id INT IDENTITY NOT NULL PRIMARY KEY, name VARCHAR(20));");
        jdbcTemplate.update("INSERT INTO address (id, personId, street) VALUES (1,1, 'oxford street');");
        jdbcTemplate.update("INSERT INTO address (id, personId, street) VALUES (2,2, 'howard street');");
        jdbcTemplate.update("INSERT INTO person (id, name) VALUES (1, 'foo');");
        jdbcTemplate.update("INSERT INTO person (id, name) VALUES (2, 'bar');");
        JobLauncher jobLauncher = context.getBean(JobLauncher.class);
        Job job = context.getBean(Job.class);
        jobLauncher.run(job, new JobParameters());
    }

    public static class Person {
        private long id;
        private String name;
        private Address address;

        public Person() {
        }

        public Person(long id, String name) {
            this.id = id;
            this.name = name;
        }

        public long getId() {
            return id;
        }

        public void setId(long id) {
            this.id = id;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public Address getAddress() {
            return address;
        }

        public void setAddress(Address address) {
            this.address = address;
        }

        @Override
        public String toString() {
            return "Person{" +
                    "id=" + id +
                    ", name='" + name + '\'' +
                    ", address=" + address +
                    '}';
        }
    }

    public static class Address {
        private int id;
        private int personId;
        private String street;

        public Address() {
        }

        public int getId() {
            return id;
        }

        public void setId(int id) {
            this.id = id;
        }

        public String getStreet() {
            return street;
        }

        public void setStreet(String street) {
            this.street = street;
        }

        public int getPersonId() {
            return personId;
        }

        public void setPersonId(int personId) {
            this.personId = personId;
        }

        @Override
        public String toString() {
            return "Address{" +
                    "id=" + id +
                    ", street='" + street + '\'' +
                    '}';
        }
    }

}