Spring 批处理编码的压缩文件
Spring Batch process an encoded zipped file
我正在研究使用 spring 批处理来处理编码压缩文件中的记录。这些记录是可变长度的,其中嵌套了可变长度数据字段。
我是 Spring 和 Spring 批处理的新手,这就是我计划构建批处理配置的方式。
- ItemReader 需要从压缩 (*.gz) 文件输入流中读取一条记录到 POJO(字节数组)中,该记录的长度将包含在流的前两个字节中。
- ItemProcessor 将解码字节数组并将信息存储在 POJO 的相关属性中。
- ItemWriter 将填充数据库。
我最初的问题是了解如何设置 ItemReader,我看过一些使用 FlatFileItemReader 的示例,但我的困难是期望有一个 Line Mapper。在我的情况下,我不知道如何做到这一点(文件中没有行的概念)。
有一些 articles 指示自定义 BufferedReaderFactory 的使用,但很高兴看到一个有效的示例。
不胜感激。
如果压缩后的文件是一个简单的txt文件,你只需要一个custum BufferedReaderFactory,linemaper然后获取当前行的String
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;
import java.util.zip.GZIPInputStream;
import org.springframework.batch.item.file.BufferedReaderFactory;
import org.springframework.core.io.Resource;
public class GZipBufferedReaderFactory implements BufferedReaderFactory {
/** Default value for gzip suffixes. */
private List<String> gzipSuffixes = new ArrayList<String>() {
{
add(".gz");
add(".gzip");
}
};
/**
* Creates Bufferedreader for gzip Resource, handles normal resources
* too.
*
* @param resource
* @param encoding
* @return
* @throws UnsupportedEncodingException
* @throws IOException
*/
@Override
public BufferedReader create(Resource resource, String encoding)
throws UnsupportedEncodingException, IOException {
for (String suffix : gzipSuffixes) {
// test for filename and description, description is used when
// handling itemStreamResources
if (resource.getFilename().endsWith(suffix)
|| resource.getDescription().endsWith(suffix)) {
return new BufferedReader(new InputStreamReader(new GZIPInputStream(resource.getInputStream()), encoding));
}
}
return new BufferedReader(new InputStreamReader(resource.getInputStream(), encoding));
}
public List<String> getGzipSuffixes() {
return gzipSuffixes;
}
public void setGzipSuffixes(List<String> gzipSuffixes) {
this.gzipSuffixes = gzipSuffixes;
}
}
简单的项目阅读器配置:
<bean id="itemReader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">
<property name="resource" value="#{jobParameters['input.file']}" />
<property name="lineMapper">
<bean class="org.springframework.batch.item.file.mapping.PassThroughLineMapper" />
</property>
<property name="strict" value="true" />
<property name="bufferedReaderFactory">
<bean class="your.custom.GZipBufferedReaderFactory" />
</property>
</bean>
我的困惑是基于自定义 ItemReader 中的文件处理,如果我要在 read() 方法中打开和处理文件,我将不得不跟踪我在文件中的位置等。我通过在自定义 ItemReader 的构造函数中创建 BufferedInputStream (BufferedInputStream(new GZIPInputStream(new FileInputStream(file))) 设法解决了这个问题,然后在 read() 方法中处理该流并在步骤的每次迭代中处理该流。
从功能请求票到spring批次(https://jira.spring.io/browse/BATCH-1750):
public class GZIPResource extends InputStreamResource implements Resource {
public GZIPResource(Resource delegate) throws IOException {
super(new GZIPInputStream(delegate.getInputStream()));
}
}
自定义 GZipBufferedReaderFactory
不适用于 FlatFileItemReader
。
编辑:惰性版本。在调用 getInputStream
之前,这不会尝试打开文件。如果您在程序初始化时创建资源(例如使用自动装配),这可以避免由于文件不存在而导致的异常。
public class GzipLazyResource extends FileSystemResource implements Resource {
public GzipLazyResource(File file) {
super(file);
}
public GzipLazyResource(String path) {
super(path);
}
@Override
public InputStream getInputStream() throws IOException {
return new GZIPInputStream(super.getInputStream());
}
}
Edit2:这仅适用于输入资源
添加另一个类似的方法 getOutputStream
将不起作用,因为 spring 使用 FileSystemResource.getFile
,而不是 FileSystemResource.getOutputStream
。
经过测试,这种从 S3 中的压缩和编码文件中读取行的简单配置有效。
要点:
- 实现一个使用 Apache 的
GZIPInputStreamFactory
的 BufferedReaderFactory
,并将其设置为 FlatFileItemReader
上的 bufferedReaderFactory。
- 使用
AmazonS3Client
从 Spring Cloud 配置一个 SimpleStorageResourceLoader
,并使用它在 S3 中获取压缩的平面文件。将其设置为 FlatFileItemReader
. 上的资源
注意:读入字符串可以很容易地替换为读入 POJO。
GZIPBufferedReaderFactory.java
使用 Apache 的 GZIPInputStreamFactory
public class GZIPBufferedReaderFactory implements BufferedReaderFactory {
private final GZIPInputStreamFactory gzipInputStreamFactory;
public GZIPBufferedReaderFactory(GZIPInputStreamFactory gzipInputStreamFactory) {
this.gzipInputStreamFactory = gzipInputStreamFactory;
}
@Override
public BufferedReader create(Resource resource, String encoding) throws IOException {
return new BufferedReader(new InputStreamReader(gzipInputStreamFactory.create(resource.getInputStream()), encoding));
}
}
AWSConfiguration.java
@Configuration
public class AWSConfiguration {
@Bean
public AmazonS3Client s3Client(AWSCredentialsProvider credentials, Region region) {
ClientConfiguration clientConfig = new ClientConfiguration();
AmazonS3Client client = new AmazonS3Client(credentials, clientConfig);
client.setRegion(region);
return client;
}
}
如何配置 AWSCredentialsProvider
和 Region
bean 可能会有所不同,我不会在这里详细说明,因为其他地方有文档。
BatchConfiguration.java
@Configuration
@EnableBatchProcessing
public class SignalsIndexBatchConfiguration {
@Autowired
public AmazonS3Client s3Client;
@Bean
public GZIPInputStreamFactory gzipInputStreamFactory() {
return new GZIPInputStreamFactory();
}
@Bean
public GZIPBufferedReaderFactory gzipBufferedReaderFactory(GZIPInputStreamFactory gzipInputStreamFactory) {
return new GZIPBufferedReaderFactory(gzipInputStreamFactory);
}
@Bean
public SimpleStorageResourceLoader simpleStorageResourceLoader() {
return new SimpleStorageResourceLoader(s3Client);
}
@Bean
@StepScope
protected FlatFileItemReader<String> itemReader(
SimpleStorageResourceLoader simpleStorageResourceLoader,
GZIPBufferedReaderFactory gzipBufferedReaderFactory) {
FlatFileItemReader<String> flatFileItemReader = new FlatFileItemReader<>();
flatFileItemReader.setBufferedReaderFactory(gzipBufferedReaderFactory);
flatFileItemReader.setResource(simpleStorageResourceLoader.getResource("s3://YOUR_FLAT_FILE.csv"));
flatFileItemReader.setLineMapper(new PassThroughLineMapper());
return flatFileItemReader;
}
@Bean
public Job job(Step step) {
return jobBuilderFactory.get("job").start(step).build();
}
@Bean
protected Step step(GZIPInputStreamFactory gzipInputStreamFactory) {
return stepBuilderFactory.get("step")
.<String, String> chunk(200)
.reader(itemReader(simpleStorageResourceLoader(), gzipBufferedReaderFactory(gzipInputStreamFactory)))
.processor(itemProcessor())
.faultTolerant()
.build();
}
/*
* These components are some of what we
* get for free with the @EnableBatchProcessing annotation
*/
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Autowired
public JobRepository jobRepository;
/*
* END Freebies
*/
@Bean
public JobLauncher jobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
}
我正在研究使用 spring 批处理来处理编码压缩文件中的记录。这些记录是可变长度的,其中嵌套了可变长度数据字段。
我是 Spring 和 Spring 批处理的新手,这就是我计划构建批处理配置的方式。
- ItemReader 需要从压缩 (*.gz) 文件输入流中读取一条记录到 POJO(字节数组)中,该记录的长度将包含在流的前两个字节中。
- ItemProcessor 将解码字节数组并将信息存储在 POJO 的相关属性中。
- ItemWriter 将填充数据库。
我最初的问题是了解如何设置 ItemReader,我看过一些使用 FlatFileItemReader 的示例,但我的困难是期望有一个 Line Mapper。在我的情况下,我不知道如何做到这一点(文件中没有行的概念)。
有一些 articles 指示自定义 BufferedReaderFactory 的使用,但很高兴看到一个有效的示例。
不胜感激。
如果压缩后的文件是一个简单的txt文件,你只需要一个custum BufferedReaderFactory,linemaper然后获取当前行的String
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;
import java.util.zip.GZIPInputStream;
import org.springframework.batch.item.file.BufferedReaderFactory;
import org.springframework.core.io.Resource;
public class GZipBufferedReaderFactory implements BufferedReaderFactory {
/** Default value for gzip suffixes. */
private List<String> gzipSuffixes = new ArrayList<String>() {
{
add(".gz");
add(".gzip");
}
};
/**
* Creates Bufferedreader for gzip Resource, handles normal resources
* too.
*
* @param resource
* @param encoding
* @return
* @throws UnsupportedEncodingException
* @throws IOException
*/
@Override
public BufferedReader create(Resource resource, String encoding)
throws UnsupportedEncodingException, IOException {
for (String suffix : gzipSuffixes) {
// test for filename and description, description is used when
// handling itemStreamResources
if (resource.getFilename().endsWith(suffix)
|| resource.getDescription().endsWith(suffix)) {
return new BufferedReader(new InputStreamReader(new GZIPInputStream(resource.getInputStream()), encoding));
}
}
return new BufferedReader(new InputStreamReader(resource.getInputStream(), encoding));
}
public List<String> getGzipSuffixes() {
return gzipSuffixes;
}
public void setGzipSuffixes(List<String> gzipSuffixes) {
this.gzipSuffixes = gzipSuffixes;
}
}
简单的项目阅读器配置:
<bean id="itemReader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">
<property name="resource" value="#{jobParameters['input.file']}" />
<property name="lineMapper">
<bean class="org.springframework.batch.item.file.mapping.PassThroughLineMapper" />
</property>
<property name="strict" value="true" />
<property name="bufferedReaderFactory">
<bean class="your.custom.GZipBufferedReaderFactory" />
</property>
</bean>
我的困惑是基于自定义 ItemReader 中的文件处理,如果我要在 read() 方法中打开和处理文件,我将不得不跟踪我在文件中的位置等。我通过在自定义 ItemReader 的构造函数中创建 BufferedInputStream (BufferedInputStream(new GZIPInputStream(new FileInputStream(file))) 设法解决了这个问题,然后在 read() 方法中处理该流并在步骤的每次迭代中处理该流。
从功能请求票到spring批次(https://jira.spring.io/browse/BATCH-1750):
public class GZIPResource extends InputStreamResource implements Resource {
public GZIPResource(Resource delegate) throws IOException {
super(new GZIPInputStream(delegate.getInputStream()));
}
}
自定义 GZipBufferedReaderFactory
不适用于 FlatFileItemReader
。
编辑:惰性版本。在调用 getInputStream
之前,这不会尝试打开文件。如果您在程序初始化时创建资源(例如使用自动装配),这可以避免由于文件不存在而导致的异常。
public class GzipLazyResource extends FileSystemResource implements Resource {
public GzipLazyResource(File file) {
super(file);
}
public GzipLazyResource(String path) {
super(path);
}
@Override
public InputStream getInputStream() throws IOException {
return new GZIPInputStream(super.getInputStream());
}
}
Edit2:这仅适用于输入资源
添加另一个类似的方法 getOutputStream
将不起作用,因为 spring 使用 FileSystemResource.getFile
,而不是 FileSystemResource.getOutputStream
。
经过测试,这种从 S3 中的压缩和编码文件中读取行的简单配置有效。
要点:
- 实现一个使用 Apache 的
GZIPInputStreamFactory
的BufferedReaderFactory
,并将其设置为FlatFileItemReader
上的 bufferedReaderFactory。 - 使用
AmazonS3Client
从 Spring Cloud 配置一个SimpleStorageResourceLoader
,并使用它在 S3 中获取压缩的平面文件。将其设置为FlatFileItemReader
. 上的资源
注意:读入字符串可以很容易地替换为读入 POJO。
GZIPBufferedReaderFactory.java
使用 Apache 的 GZIPInputStreamFactory
public class GZIPBufferedReaderFactory implements BufferedReaderFactory {
private final GZIPInputStreamFactory gzipInputStreamFactory;
public GZIPBufferedReaderFactory(GZIPInputStreamFactory gzipInputStreamFactory) {
this.gzipInputStreamFactory = gzipInputStreamFactory;
}
@Override
public BufferedReader create(Resource resource, String encoding) throws IOException {
return new BufferedReader(new InputStreamReader(gzipInputStreamFactory.create(resource.getInputStream()), encoding));
}
}
AWSConfiguration.java
@Configuration
public class AWSConfiguration {
@Bean
public AmazonS3Client s3Client(AWSCredentialsProvider credentials, Region region) {
ClientConfiguration clientConfig = new ClientConfiguration();
AmazonS3Client client = new AmazonS3Client(credentials, clientConfig);
client.setRegion(region);
return client;
}
}
如何配置 AWSCredentialsProvider
和 Region
bean 可能会有所不同,我不会在这里详细说明,因为其他地方有文档。
BatchConfiguration.java
@Configuration
@EnableBatchProcessing
public class SignalsIndexBatchConfiguration {
@Autowired
public AmazonS3Client s3Client;
@Bean
public GZIPInputStreamFactory gzipInputStreamFactory() {
return new GZIPInputStreamFactory();
}
@Bean
public GZIPBufferedReaderFactory gzipBufferedReaderFactory(GZIPInputStreamFactory gzipInputStreamFactory) {
return new GZIPBufferedReaderFactory(gzipInputStreamFactory);
}
@Bean
public SimpleStorageResourceLoader simpleStorageResourceLoader() {
return new SimpleStorageResourceLoader(s3Client);
}
@Bean
@StepScope
protected FlatFileItemReader<String> itemReader(
SimpleStorageResourceLoader simpleStorageResourceLoader,
GZIPBufferedReaderFactory gzipBufferedReaderFactory) {
FlatFileItemReader<String> flatFileItemReader = new FlatFileItemReader<>();
flatFileItemReader.setBufferedReaderFactory(gzipBufferedReaderFactory);
flatFileItemReader.setResource(simpleStorageResourceLoader.getResource("s3://YOUR_FLAT_FILE.csv"));
flatFileItemReader.setLineMapper(new PassThroughLineMapper());
return flatFileItemReader;
}
@Bean
public Job job(Step step) {
return jobBuilderFactory.get("job").start(step).build();
}
@Bean
protected Step step(GZIPInputStreamFactory gzipInputStreamFactory) {
return stepBuilderFactory.get("step")
.<String, String> chunk(200)
.reader(itemReader(simpleStorageResourceLoader(), gzipBufferedReaderFactory(gzipInputStreamFactory)))
.processor(itemProcessor())
.faultTolerant()
.build();
}
/*
* These components are some of what we
* get for free with the @EnableBatchProcessing annotation
*/
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Autowired
public JobRepository jobRepository;
/*
* END Freebies
*/
@Bean
public JobLauncher jobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
}