Spring 批处理:如何将列表输出为 RepositoryItemReader 的输出
Spring batch: How to output list as output for RepositoryItemReader
您好,我有一个用例,我应该从数据库中读取数据并尝试合并行。例如,我有一个订单和许多 orderDetails。我的 SQL 查询的输出是 orderDetails。 OrderNum 是 orderDetails 中的 FK。因此,现在当我处理 orderDetails 时,我必须按 orderNum 进行过滤并使用外部端点。在我的例子中,我使用的是 RepositoryItemReader
MyReader.java
public RepositoryItemReader<List<POJO>> readCurrentSeasonOrder() {
// some code
}
MyProcessor.java
public POJO2 process(List<POJO> items) throws Exception {
// some code
}
当我 运行 我的批处理作业出现异常时
POJO cannot be cast to java.util.List
我找不到适合我的用例的任何示例。非常感谢任何帮助。
“对象列表”不是 Spring 批处理的 chunk-oriented 处理模型中项目的良好封装。
选项 1:您可以更改查询以加入订单和订单详细信息以及 return 类型 Order
的项目。这样,一个项目将是 Order
.
选项 2:另一种方法是使用 driving query pattern。这个想法是让你的 reader return Order
s,并使用一个处理器来丰富订单的细节。这适用于 small/medium 数据集,但对于大型数据集表现不佳(因为每个项目的额外查询)。这与算法本身有关,与 Spring Batch 本身无关。
您更喜欢选项 1,其中数据库以高效的方式进行连接并为您提供延期交货的商品 pre-filled 及其详细信息。
编辑:为驾驶查询模式添加样本
以下示例展示了驱动查询模式的概念。 reader returns 项 Person
类型。处理器用他们的地址丰富个人项目。看看 reader 如何只获取人的 ID 和姓名,而不是地址。您可以根据您的情况将其调整为 Order
和 OrderDetails
。
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.sample;
import javax.sql.DataSource;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
/**
* @author Mahmoud Ben Hassine
*/
@Configuration
@EnableBatchProcessing
public class MyJob {
@Bean
public DataSource dataSource() {
return new EmbeddedDatabaseBuilder()
.setType(EmbeddedDatabaseType.H2)
.addScript("/org/springframework/batch/core/schema-drop-h2.sql")
.addScript("/org/springframework/batch/core/schema-h2.sql")
.build();
}
@Bean
public JdbcTemplate jdbcTemplate(DataSource dataSource) {
return new JdbcTemplate(dataSource);
}
@Bean
public JdbcCursorItemReader<Person> itemReader() {
return new JdbcCursorItemReaderBuilder<Person>()
.name("personItemReader")
.dataSource(dataSource())
.sql("select id, name from person")
.beanRowMapper(Person.class)
.build();
}
@Bean
public ItemProcessor<Person, Person> itemProcessor() {
return new ItemProcessor<Person, Person>() {
@Autowired
private JdbcTemplate jdbcTemplate;
@Override
public Person process(Person person) {
Address address = jdbcTemplate.queryForObject("select * from address where personId = ?", new Object[]{person.getId()}, new BeanPropertyRowMapper<>(Address.class));
person.setAddress(address);
return person;
}
};
}
@Bean
public ItemWriter<Person> itemWriter() {
return items -> {
for (Person item : items) {
System.out.println("item = " + item);
}
};
}
@Bean
public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) {
return jobs.get("job")
.start(steps.get("step")
.<Person, Person>chunk(2)
.reader(itemReader())
.processor(itemProcessor())
.writer(itemWriter())
.build())
.build();
}
public static void main(String[] args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
JdbcTemplate jdbcTemplate = context.getBean(JdbcTemplate.class);
jdbcTemplate.update("CREATE TABLE address (id INT IDENTITY NOT NULL PRIMARY KEY, personId INT, street VARCHAR(20));");
jdbcTemplate.update("CREATE TABLE person (id INT IDENTITY NOT NULL PRIMARY KEY, name VARCHAR(20));");
jdbcTemplate.update("INSERT INTO address (id, personId, street) VALUES (1,1, 'oxford street');");
jdbcTemplate.update("INSERT INTO address (id, personId, street) VALUES (2,2, 'howard street');");
jdbcTemplate.update("INSERT INTO person (id, name) VALUES (1, 'foo');");
jdbcTemplate.update("INSERT INTO person (id, name) VALUES (2, 'bar');");
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
jobLauncher.run(job, new JobParameters());
}
public static class Person {
private long id;
private String name;
private Address address;
public Person() {
}
public Person(long id, String name) {
this.id = id;
this.name = name;
}
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Address getAddress() {
return address;
}
public void setAddress(Address address) {
this.address = address;
}
@Override
public String toString() {
return "Person{" +
"id=" + id +
", name='" + name + '\'' +
", address=" + address +
'}';
}
}
public static class Address {
private int id;
private int personId;
private String street;
public Address() {
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getStreet() {
return street;
}
public void setStreet(String street) {
this.street = street;
}
public int getPersonId() {
return personId;
}
public void setPersonId(int personId) {
this.personId = personId;
}
@Override
public String toString() {
return "Address{" +
"id=" + id +
", street='" + street + '\'' +
'}';
}
}
}
您好,我有一个用例,我应该从数据库中读取数据并尝试合并行。例如,我有一个订单和许多 orderDetails。我的 SQL 查询的输出是 orderDetails。 OrderNum 是 orderDetails 中的 FK。因此,现在当我处理 orderDetails 时,我必须按 orderNum 进行过滤并使用外部端点。在我的例子中,我使用的是 RepositoryItemReader
MyReader.java
public RepositoryItemReader<List<POJO>> readCurrentSeasonOrder() {
// some code
}
MyProcessor.java
public POJO2 process(List<POJO> items) throws Exception {
// some code
}
当我 运行 我的批处理作业出现异常时
POJO cannot be cast to java.util.List
我找不到适合我的用例的任何示例。非常感谢任何帮助。
“对象列表”不是 Spring 批处理的 chunk-oriented 处理模型中项目的良好封装。
选项 1:您可以更改查询以加入订单和订单详细信息以及 return 类型 Order
的项目。这样,一个项目将是 Order
.
选项 2:另一种方法是使用 driving query pattern。这个想法是让你的 reader return Order
s,并使用一个处理器来丰富订单的细节。这适用于 small/medium 数据集,但对于大型数据集表现不佳(因为每个项目的额外查询)。这与算法本身有关,与 Spring Batch 本身无关。
您更喜欢选项 1,其中数据库以高效的方式进行连接并为您提供延期交货的商品 pre-filled 及其详细信息。
编辑:为驾驶查询模式添加样本
以下示例展示了驱动查询模式的概念。 reader returns 项 Person
类型。处理器用他们的地址丰富个人项目。看看 reader 如何只获取人的 ID 和姓名,而不是地址。您可以根据您的情况将其调整为 Order
和 OrderDetails
。
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.sample;
import javax.sql.DataSource;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
/**
* @author Mahmoud Ben Hassine
*/
@Configuration
@EnableBatchProcessing
public class MyJob {
@Bean
public DataSource dataSource() {
return new EmbeddedDatabaseBuilder()
.setType(EmbeddedDatabaseType.H2)
.addScript("/org/springframework/batch/core/schema-drop-h2.sql")
.addScript("/org/springframework/batch/core/schema-h2.sql")
.build();
}
@Bean
public JdbcTemplate jdbcTemplate(DataSource dataSource) {
return new JdbcTemplate(dataSource);
}
@Bean
public JdbcCursorItemReader<Person> itemReader() {
return new JdbcCursorItemReaderBuilder<Person>()
.name("personItemReader")
.dataSource(dataSource())
.sql("select id, name from person")
.beanRowMapper(Person.class)
.build();
}
@Bean
public ItemProcessor<Person, Person> itemProcessor() {
return new ItemProcessor<Person, Person>() {
@Autowired
private JdbcTemplate jdbcTemplate;
@Override
public Person process(Person person) {
Address address = jdbcTemplate.queryForObject("select * from address where personId = ?", new Object[]{person.getId()}, new BeanPropertyRowMapper<>(Address.class));
person.setAddress(address);
return person;
}
};
}
@Bean
public ItemWriter<Person> itemWriter() {
return items -> {
for (Person item : items) {
System.out.println("item = " + item);
}
};
}
@Bean
public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) {
return jobs.get("job")
.start(steps.get("step")
.<Person, Person>chunk(2)
.reader(itemReader())
.processor(itemProcessor())
.writer(itemWriter())
.build())
.build();
}
public static void main(String[] args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
JdbcTemplate jdbcTemplate = context.getBean(JdbcTemplate.class);
jdbcTemplate.update("CREATE TABLE address (id INT IDENTITY NOT NULL PRIMARY KEY, personId INT, street VARCHAR(20));");
jdbcTemplate.update("CREATE TABLE person (id INT IDENTITY NOT NULL PRIMARY KEY, name VARCHAR(20));");
jdbcTemplate.update("INSERT INTO address (id, personId, street) VALUES (1,1, 'oxford street');");
jdbcTemplate.update("INSERT INTO address (id, personId, street) VALUES (2,2, 'howard street');");
jdbcTemplate.update("INSERT INTO person (id, name) VALUES (1, 'foo');");
jdbcTemplate.update("INSERT INTO person (id, name) VALUES (2, 'bar');");
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
jobLauncher.run(job, new JobParameters());
}
public static class Person {
private long id;
private String name;
private Address address;
public Person() {
}
public Person(long id, String name) {
this.id = id;
this.name = name;
}
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Address getAddress() {
return address;
}
public void setAddress(Address address) {
this.address = address;
}
@Override
public String toString() {
return "Person{" +
"id=" + id +
", name='" + name + '\'' +
", address=" + address +
'}';
}
}
public static class Address {
private int id;
private int personId;
private String street;
public Address() {
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getStreet() {
return street;
}
public void setStreet(String street) {
this.street = street;
}
public int getPersonId() {
return personId;
}
public void setPersonId(int personId) {
this.personId = personId;
}
@Override
public String toString() {
return "Address{" +
"id=" + id +
", street='" + street + '\'' +
'}';
}
}
}