第二步不插入数据
2nd step not inserting data
在我下面的代码中,我为一个作业定义了 2 个步骤,其中每个步骤都从不同的 csv 中读取数据。这里第一步的数据被插入到数据库中,但是第二步没有将数据插入到数据库中。能否请您帮忙指出错误
@Configuration
@EnableBatchProcessing
public class MacroSimulatorConfiguration {
@Autowired
private JobBuilderFactory jobs;
@Autowired
private StepBuilderFactory steps;
@Bean
public ItemReader<Consumption> reader() {
FlatFileItemReader<Consumption> reader = new FlatFileItemReader<Consumption>();
reader.setResource(new ClassPathResource("datacons.csv"));
reader.setLinesToSkip(1);
reader.setLineMapper(new DefaultLineMapper<Consumption>() {
{
setLineTokenizer(new DelimitedLineTokenizer() {
{
setNames(new String[] { "tradeCommodity", "hou", "region", "dir", "purchValue", "value" });
}
});
setFieldSetMapper(new BeanWrapperFieldSetMapper<Consumption>() {
{
setTargetType(Consumption.class);
}
});
}
});
return reader;
}
@Bean
public ItemReader<Gdp> reader1() {
FlatFileItemReader<Gdp> reader1 = new FlatFileItemReader<Gdp>();
reader1.setResource(new ClassPathResource("datagdp.csv"));
reader1.setLinesToSkip(1);
reader1.setLineMapper(new DefaultLineMapper<Gdp>() {
{
setLineTokenizer(new DelimitedLineTokenizer() {
{
setNames(new String[] { "region", "gdpExpend", "value" });
}
});
setFieldSetMapper(new BeanWrapperFieldSetMapper<Gdp>() {
{
setTargetType(Gdp.class);
}
});
}
});
return reader1;
}
@Bean
public ItemWriter<Consumption> writer(DataSource dataSource) {
JdbcBatchItemWriter<Consumption> writer = new JdbcBatchItemWriter<Consumption>();
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Consumption>());
writer.setSql("INSERT INTO INPUT_CONSUMPTION (TRAD_COMM, HOU, SUB_REGION, INCOME_GROUP, CITIZEN_STATUS, REGION, DIR, PURCHVALUE, VAL) "
+ "VALUES (:tradeCommodity, :hou, :subRegion, :incomeGroup, :citizenStatus, :region, :dir, :purchValue, :value)");
writer.setDataSource(dataSource);
return writer;
}
@Bean
public ItemWriter<Gdp> writer1(DataSource dataSource) {
JdbcBatchItemWriter<Gdp> writer1 = new JdbcBatchItemWriter<Gdp>();
writer1.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Gdp>());
writer1.setSql("INSERT INTO input_gdp (REGION, GDPEXPEND, VAL) " + "VALUES (:region, :gdpExpend, :value)");
writer1.setDataSource(dataSource);
return writer1;
}
@Bean
public Job importJob(Step s1, Step s2) {
return jobs.get("importJob").incrementer(new RunIdIncrementer()).start(s1).next(s2).build();
}
@Bean(name = "s1")
public Step step1(ItemReader<Consumption> reader, ItemWriter<Consumption> writer) {
return steps.get("step1").<Consumption, Consumption>chunk(100).reader(reader).writer(writer).build();
}
@Bean(name = "s2")
public Step step2(ItemReader<Gdp> reader1, ItemWriter<Gdp> writer1) {
return steps.get("step2").<Gdp, Gdp>chunk(1).reader(reader1).writer(writer1).build();
}
}
这是我在控制台中看到的。第一个 csv 存在解析错误,因为第 13834 行及之后没有记录。但是第一个csv的记录已经成功插入到DB中,所以猜测这个解析错误可以忽略。想知道是否为第二个 csv 正确定义了 reader、编写器、步骤和作业。
控制台:
Job: [SimpleJob: [name=importJob]] launched with the following parameters: [{run.id=1}]
Executing step: [step1]
Encountered an error executing step step1 in job importJob
Parsing error at line: 13834 in resource=[class path resource [datacons.csv]], input=[]
Job: [SimpleJob: [name=importJob]] completed with the following parameters: [{run.id=1}] and the following status: [FAILED]
从我在控制台输出中看到的,您的 step2 根本没有执行。
这是正常的 Spring 批处理行为:如果在步骤中遇到“non-skippable”错误,该步骤将以状态 FAILED
终止,作业也会终止,以免你有一个 .on("FAILED")
来明确防止默认行为并调用另一个步骤。
此外,您可能想知道为什么您的数据库中仍然插入了记录,这是因为 Spring 根据您定义的 commit-interval
批量提交记录。由于您将其设置为 1,因此错误记录之前的每条记录都将被提交。
所以,这里有 3 个解决方案:
- 防止文件解析错误
- 添加可跳过的异常 class(您的 ParseException 或简单的
java.lang.Exception
)。这将告诉 Spring Batch 忽略错误并继续读取文件。
- 明确声明第一步和第二步之间的转换
.on("*")
以启动第二步,即使第一步失败。第一个文件只会读取到第一个错误,然后读取第二个文件。
在我下面的代码中,我为一个作业定义了 2 个步骤,其中每个步骤都从不同的 csv 中读取数据。这里第一步的数据被插入到数据库中,但是第二步没有将数据插入到数据库中。能否请您帮忙指出错误
@Configuration
@EnableBatchProcessing
public class MacroSimulatorConfiguration {
@Autowired
private JobBuilderFactory jobs;
@Autowired
private StepBuilderFactory steps;
@Bean
public ItemReader<Consumption> reader() {
FlatFileItemReader<Consumption> reader = new FlatFileItemReader<Consumption>();
reader.setResource(new ClassPathResource("datacons.csv"));
reader.setLinesToSkip(1);
reader.setLineMapper(new DefaultLineMapper<Consumption>() {
{
setLineTokenizer(new DelimitedLineTokenizer() {
{
setNames(new String[] { "tradeCommodity", "hou", "region", "dir", "purchValue", "value" });
}
});
setFieldSetMapper(new BeanWrapperFieldSetMapper<Consumption>() {
{
setTargetType(Consumption.class);
}
});
}
});
return reader;
}
@Bean
public ItemReader<Gdp> reader1() {
FlatFileItemReader<Gdp> reader1 = new FlatFileItemReader<Gdp>();
reader1.setResource(new ClassPathResource("datagdp.csv"));
reader1.setLinesToSkip(1);
reader1.setLineMapper(new DefaultLineMapper<Gdp>() {
{
setLineTokenizer(new DelimitedLineTokenizer() {
{
setNames(new String[] { "region", "gdpExpend", "value" });
}
});
setFieldSetMapper(new BeanWrapperFieldSetMapper<Gdp>() {
{
setTargetType(Gdp.class);
}
});
}
});
return reader1;
}
@Bean
public ItemWriter<Consumption> writer(DataSource dataSource) {
JdbcBatchItemWriter<Consumption> writer = new JdbcBatchItemWriter<Consumption>();
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Consumption>());
writer.setSql("INSERT INTO INPUT_CONSUMPTION (TRAD_COMM, HOU, SUB_REGION, INCOME_GROUP, CITIZEN_STATUS, REGION, DIR, PURCHVALUE, VAL) "
+ "VALUES (:tradeCommodity, :hou, :subRegion, :incomeGroup, :citizenStatus, :region, :dir, :purchValue, :value)");
writer.setDataSource(dataSource);
return writer;
}
@Bean
public ItemWriter<Gdp> writer1(DataSource dataSource) {
JdbcBatchItemWriter<Gdp> writer1 = new JdbcBatchItemWriter<Gdp>();
writer1.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Gdp>());
writer1.setSql("INSERT INTO input_gdp (REGION, GDPEXPEND, VAL) " + "VALUES (:region, :gdpExpend, :value)");
writer1.setDataSource(dataSource);
return writer1;
}
@Bean
public Job importJob(Step s1, Step s2) {
return jobs.get("importJob").incrementer(new RunIdIncrementer()).start(s1).next(s2).build();
}
@Bean(name = "s1")
public Step step1(ItemReader<Consumption> reader, ItemWriter<Consumption> writer) {
return steps.get("step1").<Consumption, Consumption>chunk(100).reader(reader).writer(writer).build();
}
@Bean(name = "s2")
public Step step2(ItemReader<Gdp> reader1, ItemWriter<Gdp> writer1) {
return steps.get("step2").<Gdp, Gdp>chunk(1).reader(reader1).writer(writer1).build();
}
}
这是我在控制台中看到的。第一个 csv 存在解析错误,因为第 13834 行及之后没有记录。但是第一个csv的记录已经成功插入到DB中,所以猜测这个解析错误可以忽略。想知道是否为第二个 csv 正确定义了 reader、编写器、步骤和作业。
控制台:
Job: [SimpleJob: [name=importJob]] launched with the following parameters: [{run.id=1}]
Executing step: [step1]
Encountered an error executing step step1 in job importJob
Parsing error at line: 13834 in resource=[class path resource [datacons.csv]], input=[]
Job: [SimpleJob: [name=importJob]] completed with the following parameters: [{run.id=1}] and the following status: [FAILED]
从我在控制台输出中看到的,您的 step2 根本没有执行。
这是正常的 Spring 批处理行为:如果在步骤中遇到“non-skippable”错误,该步骤将以状态 FAILED
终止,作业也会终止,以免你有一个 .on("FAILED")
来明确防止默认行为并调用另一个步骤。
此外,您可能想知道为什么您的数据库中仍然插入了记录,这是因为 Spring 根据您定义的 commit-interval
批量提交记录。由于您将其设置为 1,因此错误记录之前的每条记录都将被提交。
所以,这里有 3 个解决方案:
- 防止文件解析错误
- 添加可跳过的异常 class(您的 ParseException 或简单的
java.lang.Exception
)。这将告诉 Spring Batch 忽略错误并继续读取文件。 - 明确声明第一步和第二步之间的转换
.on("*")
以启动第二步,即使第一步失败。第一个文件只会读取到第一个错误,然后读取第二个文件。