Spring 批处理编码的压缩文件

Spring Batch process an encoded zipped file

我正在研究使用 spring 批处理来处理编码压缩文件中的记录。这些记录是可变长度的,其中嵌套了可变长度数据字段。

我是 Spring 和 Spring 批处理的新手,这就是我计划构建批处理配置的方式。

我最初的问题是了解如何设置 ItemReader,我看过一些使用 FlatFileItemReader 的示例,但我的困难是期望有一个 Line Mapper。在我的情况下,我不知道如何做到这一点(文件中没有行的概念)。

有一些 articles 指示自定义 BufferedReaderFactory 的使用,但很高兴看到一个有效的示例。

不胜感激。

如果压缩后的文件是一个简单的txt文件,你只需要一个custum BufferedReaderFactory,linemaper然后获取当前行的String

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;
import java.util.zip.GZIPInputStream;
import org.springframework.batch.item.file.BufferedReaderFactory;
import org.springframework.core.io.Resource;

public class GZipBufferedReaderFactory implements BufferedReaderFactory {

    /** Default value for gzip suffixes. */
    private List<String> gzipSuffixes = new ArrayList<String>() {

        {
            add(".gz");
            add(".gzip");
        }
    };

    /**
     * Creates Bufferedreader for gzip Resource, handles normal resources
     * too.
     * 
     * @param resource
     * @param encoding
     * @return
     * @throws UnsupportedEncodingException
     * @throws IOException 
     */
    @Override
    public BufferedReader create(Resource resource, String encoding)
            throws UnsupportedEncodingException, IOException {
        for (String suffix : gzipSuffixes) {
            // test for filename and description, description is used when 
            // handling itemStreamResources
            if (resource.getFilename().endsWith(suffix)
                    || resource.getDescription().endsWith(suffix)) {
                return new BufferedReader(new InputStreamReader(new GZIPInputStream(resource.getInputStream()), encoding));
            }
        }
        return new BufferedReader(new InputStreamReader(resource.getInputStream(), encoding));
    }

    public List<String> getGzipSuffixes() {
        return gzipSuffixes;
    }

    public void setGzipSuffixes(List<String> gzipSuffixes) {
        this.gzipSuffixes = gzipSuffixes;
    }
}

简单的项目阅读器配置:

 <bean id="itemReader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">
  <property name="resource" value="#{jobParameters['input.file']}" />
  <property name="lineMapper">
    <bean class="org.springframework.batch.item.file.mapping.PassThroughLineMapper" />
  </property>
  <property name="strict" value="true" />
  <property name="bufferedReaderFactory">
    <bean class="your.custom.GZipBufferedReaderFactory" />
  </property>
</bean>

我的困惑是基于自定义 ItemReader 中的文件处理,如果我要在 read() 方法中打开和处理文件,我将不得不跟踪我在文件中的位置等。我通过在自定义 ItemReader 的构造函数中创建 BufferedInputStream (BufferedInputStream(new GZIPInputStream(new FileInputStream(file))) 设法解决了这个问题,然后在 read() 方法中处理该流并在步骤的每次迭代中处理该流。

从功能请求票到spring批次(https://jira.spring.io/browse/BATCH-1750):

public class GZIPResource extends InputStreamResource implements Resource {

    public GZIPResource(Resource delegate) throws IOException {
        super(new GZIPInputStream(delegate.getInputStream()));
    }
}

自定义 GZipBufferedReaderFactory 不适用于 FlatFileItemReader

编辑:惰性版本。在调用 getInputStream 之前,这不会尝试打开文件。如果您在程序初始化时创建资源(例如使用自动装配),这可以避免由于文件不存在而导致的异常。

public class GzipLazyResource extends FileSystemResource implements Resource {

    public GzipLazyResource(File file) {
        super(file);
    }

    public GzipLazyResource(String path) {
        super(path);
    }

    @Override
    public InputStream getInputStream() throws IOException {
        return new GZIPInputStream(super.getInputStream());
    }
}

Edit2:这仅适用于输入资源

添加另一个类似的方法 getOutputStream 将不起作用,因为 spring 使用 FileSystemResource.getFile,而不是 FileSystemResource.getOutputStream

经过测试,这种从 S3 中的压缩和编码文件中读取行的简单配置有效。

要点:

  • 实现一个使用 Apache 的 GZIPInputStreamFactoryBufferedReaderFactory,并将其设置为 FlatFileItemReader 上的 bufferedReaderFactory。
  • 使用 AmazonS3Client 从 Spring Cloud 配置一个 SimpleStorageResourceLoader,并使用它在 S3 中获取压缩的平面文件。将其设置为 FlatFileItemReader.
  • 上的资源

注意:读入字符串可以很容易地替换为读入 POJO。

GZIPBufferedReaderFactory.java

使用 Apache 的 GZIPInputStreamFactory

public class GZIPBufferedReaderFactory implements BufferedReaderFactory {

    private final GZIPInputStreamFactory gzipInputStreamFactory;

    public GZIPBufferedReaderFactory(GZIPInputStreamFactory gzipInputStreamFactory) {
        this.gzipInputStreamFactory = gzipInputStreamFactory;
    }

    @Override
    public BufferedReader create(Resource resource, String encoding) throws IOException {
        return new BufferedReader(new InputStreamReader(gzipInputStreamFactory.create(resource.getInputStream()), encoding));
    }
}

AWSConfiguration.java

@Configuration
public class AWSConfiguration {

    @Bean
    public AmazonS3Client s3Client(AWSCredentialsProvider credentials, Region region) {
        ClientConfiguration clientConfig = new ClientConfiguration();

        AmazonS3Client client = new AmazonS3Client(credentials, clientConfig);
        client.setRegion(region);
        return client;
    }
}

如何配置 AWSCredentialsProviderRegion bean 可能会有所不同,我不会在这里详细说明,因为其他地方有文档。

BatchConfiguration.java

@Configuration
@EnableBatchProcessing
public class SignalsIndexBatchConfiguration {

    @Autowired
    public AmazonS3Client s3Client;

    @Bean
    public GZIPInputStreamFactory gzipInputStreamFactory() {
        return new GZIPInputStreamFactory();
    }

    @Bean
    public GZIPBufferedReaderFactory gzipBufferedReaderFactory(GZIPInputStreamFactory gzipInputStreamFactory) {
        return new GZIPBufferedReaderFactory(gzipInputStreamFactory);
    }

    @Bean
    public SimpleStorageResourceLoader simpleStorageResourceLoader() {
        return new SimpleStorageResourceLoader(s3Client);
    }

    @Bean
    @StepScope
    protected FlatFileItemReader<String> itemReader(
            SimpleStorageResourceLoader simpleStorageResourceLoader,
            GZIPBufferedReaderFactory gzipBufferedReaderFactory) {
        FlatFileItemReader<String> flatFileItemReader = new FlatFileItemReader<>();
        flatFileItemReader.setBufferedReaderFactory(gzipBufferedReaderFactory);
        flatFileItemReader.setResource(simpleStorageResourceLoader.getResource("s3://YOUR_FLAT_FILE.csv"));
        flatFileItemReader.setLineMapper(new PassThroughLineMapper());
        return flatFileItemReader;
    }

    @Bean
    public Job job(Step step) {
        return jobBuilderFactory.get("job").start(step).build();
    }

    @Bean
    protected Step step(GZIPInputStreamFactory gzipInputStreamFactory) {
        return stepBuilderFactory.get("step")
                .<String, String> chunk(200)
                .reader(itemReader(simpleStorageResourceLoader(), gzipBufferedReaderFactory(gzipInputStreamFactory)))
                .processor(itemProcessor())
                .faultTolerant()
                .build();
    }

    /*
     * These components are some of what we
     * get for free with the @EnableBatchProcessing annotation
     */
    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    @Autowired
    public JobRepository jobRepository;
    /*
     * END Freebies
     */

    @Bean
    public JobLauncher jobLauncher() throws Exception {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(jobRepository);
        jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
        jobLauncher.afterPropertiesSet();
        return jobLauncher;
    }
}