我们可以使用 spring Batch 顺序处理多个文件,而多个线程用于处理单个文件数据吗?
can we process the multiple files sequentially using spring Batch while multiple threads used to process individual files data..?
我想按顺序处理多个文件,每个文件都需要在多个线程的帮助下处理,所以使用了 spring 批处理 FlatFileItemReader 和 TaskExecutor,它对我来说似乎工作正常。如要求中所述,我们必须处理多个文件,因此与 FlatFileItemReader 一起,我正在使用 MultiResourceItemReader,它将获取多个文件并在我遇到问题的地方一个一个地处理。有人可以帮助我异常的原因是什么吗?修复它的方法是什么..?
org.springframework.batch.item.ReaderNotOpenException: Reader must be open before it can be read.
at org.springframework.batch.item.file.FlatFileItemReader.readLine(FlatFileItemReader.java:195) ~[spring-batch-infrastructure-3.0.5.RELEASE.jar:3.0.5.RELEASE]
at org.springframework.batch.item.file.FlatFileItemReader.doRead(FlatFileItemReader.java:173) ~[spring-batch-infrastructure-3.0.5.RELEASE.jar:3.0.5.RELEASE]
at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.read(AbstractItemCountingItemStreamItemReader.java:88) ~[spring-batch-infrastructure-3.0.5.RELEASE.jar:3.0.5.RELEASE]
at org.springframework.batch.item.file.MultiResourceItemReader.readFromDelegate(MultiResourceItemReader.java:140) ~[spring-batch-infrastructure-3.0.5.RELEASE.jar:3.0.5.RELEASE]
at org.springframework.batch.item.file.MultiResourceItemReader.readNextItem(MultiResourceItemReader.java:119)
customer2.csv
200,Zoe,Nelson,1973-01-12 17:19:30
201,Vivian,Love,1951-10-31 08:57:08
202,Charde,Lang,1967-02-23 12:24:26
customer3.csv
400,Amelia,Osborn,1972-05-09 09:21:22
401,Gemma,Finch,1989-09-25 23:00:59
402,Orli,Slater,1959-03-30 15:54:32
403,Donovan,Beasley,1986-06-18 14:50:30
customer4.csv
600,Zelenia,Henson,1982-07-03 03:28:39
601,Thomas,Mathews,1954-11-21 20:34:03
602,Kevyn,Whitney,1984-09-21 06:24:25
603,Marny,Leon,1984-06-10 21:32:09
604,Jarrod,Gay,1960-06-22 19:11:04
customer5.csv
800,Imogene,Lee,1966-10-19 17:53:44
801,Mira,Franks,1964-03-08 09:47:43
802,Silas,Dixon,1953-04-11 01:37:51
803,Paloma,Daniels,1962-06-14 17:01:02
我的代码:
@Bean
public MultiResourceItemReader<Customer> multiResourceItemReader() {
System.out.println("In multiResourceItemReader");
MultiResourceItemReader<Customer> reader = new MultiResourceItemReader<>();
reader.setDelegate(customerItemReader());
reader.setResources(inputFiles);
return reader;
}
@Bean
public FlatFileItemReader<Customer> customerItemReader() {
FlatFileItemReader<Customer> reader = new FlatFileItemReader<>();
DefaultLineMapper<Customer> customerLineMapper = new DefaultLineMapper<>();
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
tokenizer.setNames(new String[] {"id", "firstName", "lastName", "birthdate"});
customerLineMapper.setLineTokenizer(tokenizer);
customerLineMapper.setFieldSetMapper(new CustomerFieldSetMapper());
customerLineMapper.afterPropertiesSet();
reader.setLineMapper(customerLineMapper);
return reader;
}
下面的片段在使用下面时工作正常:
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.<Customer, Customer>chunk(100).
reader(customerItemReader())
.writer(customerItemWriter()).taskExecutor(taskExecutor()).throttleLimit(10)
.build();
}
}
波纹管代码段无法正常工作得到上述异常
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.<Customer, Customer>chunk(100).
reader(multiResourceItemReader())
.writer(customerItemWriter()).taskExecutor(taskExecutor()).throttleLimit(10)
.build();
}
由于您在多线程步骤中使用了 reader,一个线程可能关闭了当前文件,而另一个线程正在尝试同时读取该文件。您需要将对 reader 的访问与 SynchronizedItemStreamReader
:
同步
@Bean
public SynchronizedItemStreamReader<Customer> multiResourceItemReader() {
System.out.println("In multiResourceItemReader");
MultiResourceItemReader<Customer> reader = new MultiResourceItemReader<>();
reader.setDelegate(customerItemReader());
reader.setResources(inputFiles);
SynchronizedItemStreamReader<Customer> synchronizedItemStreamReader = new SynchronizedItemStreamReader<>();
synchronizedItemStreamReader.setDelegate(reader);
return synchronizedItemStreamReader;
}
我想按顺序处理多个文件,每个文件都需要在多个线程的帮助下处理,所以使用了 spring 批处理 FlatFileItemReader 和 TaskExecutor,它对我来说似乎工作正常。如要求中所述,我们必须处理多个文件,因此与 FlatFileItemReader 一起,我正在使用 MultiResourceItemReader,它将获取多个文件并在我遇到问题的地方一个一个地处理。有人可以帮助我异常的原因是什么吗?修复它的方法是什么..?
org.springframework.batch.item.ReaderNotOpenException: Reader must be open before it can be read.
at org.springframework.batch.item.file.FlatFileItemReader.readLine(FlatFileItemReader.java:195) ~[spring-batch-infrastructure-3.0.5.RELEASE.jar:3.0.5.RELEASE]
at org.springframework.batch.item.file.FlatFileItemReader.doRead(FlatFileItemReader.java:173) ~[spring-batch-infrastructure-3.0.5.RELEASE.jar:3.0.5.RELEASE]
at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.read(AbstractItemCountingItemStreamItemReader.java:88) ~[spring-batch-infrastructure-3.0.5.RELEASE.jar:3.0.5.RELEASE]
at org.springframework.batch.item.file.MultiResourceItemReader.readFromDelegate(MultiResourceItemReader.java:140) ~[spring-batch-infrastructure-3.0.5.RELEASE.jar:3.0.5.RELEASE]
at org.springframework.batch.item.file.MultiResourceItemReader.readNextItem(MultiResourceItemReader.java:119)
customer2.csv
200,Zoe,Nelson,1973-01-12 17:19:30
201,Vivian,Love,1951-10-31 08:57:08
202,Charde,Lang,1967-02-23 12:24:26
customer3.csv
400,Amelia,Osborn,1972-05-09 09:21:22
401,Gemma,Finch,1989-09-25 23:00:59
402,Orli,Slater,1959-03-30 15:54:32
403,Donovan,Beasley,1986-06-18 14:50:30
customer4.csv
600,Zelenia,Henson,1982-07-03 03:28:39
601,Thomas,Mathews,1954-11-21 20:34:03
602,Kevyn,Whitney,1984-09-21 06:24:25
603,Marny,Leon,1984-06-10 21:32:09
604,Jarrod,Gay,1960-06-22 19:11:04
customer5.csv
800,Imogene,Lee,1966-10-19 17:53:44
801,Mira,Franks,1964-03-08 09:47:43
802,Silas,Dixon,1953-04-11 01:37:51
803,Paloma,Daniels,1962-06-14 17:01:02
我的代码:
@Bean
public MultiResourceItemReader<Customer> multiResourceItemReader() {
System.out.println("In multiResourceItemReader");
MultiResourceItemReader<Customer> reader = new MultiResourceItemReader<>();
reader.setDelegate(customerItemReader());
reader.setResources(inputFiles);
return reader;
}
@Bean
public FlatFileItemReader<Customer> customerItemReader() {
FlatFileItemReader<Customer> reader = new FlatFileItemReader<>();
DefaultLineMapper<Customer> customerLineMapper = new DefaultLineMapper<>();
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
tokenizer.setNames(new String[] {"id", "firstName", "lastName", "birthdate"});
customerLineMapper.setLineTokenizer(tokenizer);
customerLineMapper.setFieldSetMapper(new CustomerFieldSetMapper());
customerLineMapper.afterPropertiesSet();
reader.setLineMapper(customerLineMapper);
return reader;
}
下面的片段在使用下面时工作正常:
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.<Customer, Customer>chunk(100).
reader(customerItemReader())
.writer(customerItemWriter()).taskExecutor(taskExecutor()).throttleLimit(10)
.build();
}
}
波纹管代码段无法正常工作得到上述异常
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.<Customer, Customer>chunk(100).
reader(multiResourceItemReader())
.writer(customerItemWriter()).taskExecutor(taskExecutor()).throttleLimit(10)
.build();
}
由于您在多线程步骤中使用了 reader,一个线程可能关闭了当前文件,而另一个线程正在尝试同时读取该文件。您需要将对 reader 的访问与 SynchronizedItemStreamReader
:
@Bean
public SynchronizedItemStreamReader<Customer> multiResourceItemReader() {
System.out.println("In multiResourceItemReader");
MultiResourceItemReader<Customer> reader = new MultiResourceItemReader<>();
reader.setDelegate(customerItemReader());
reader.setResources(inputFiles);
SynchronizedItemStreamReader<Customer> synchronizedItemStreamReader = new SynchronizedItemStreamReader<>();
synchronizedItemStreamReader.setDelegate(reader);
return synchronizedItemStreamReader;
}