Spring 批处理 - FlatFileItemWriter 错误 14416:流已关闭

Spring Batch - FlatFileItemWriter Error 14416: Stream is already closed

基本上我有一个 Spring 查询数据库并实现分区程序以获取作业的批处理,并将作业分配给 SlaveStep 中的 ThreadPoolTask​​Executors。

Reader 从数据库中读取(作业)。 Writer 将数据加载到 Azure Blob 存储中的 csv 文件中。

Job Partitioner 和 Reader 工作正常。 Writer 写入一个文件,然后关闭,其他作业无法完成,因为流已关闭。我收到以下错误:

Reading: market1
Reading: market2
Reading: market3
Reading: market4
Reading: market5
Writter: /upload-demo/market3_2021-06-01.csv
Writter: /upload-demo/market5_2021-06-01.csv
Writter: /upload-demo/market4_63_2021-06-01.csv
Writter: /upload-demo/market2_2021-06-01.csv
Writter: /upload-demo/market1_11_2021-06-01.csv
2021-06-02 08:24:42.304 ERROR 20356 --- [ taskExecutor-3] c.a.storage.common.StorageOutputStream   : Stream is already closed.
2021-06-02 08:24:42.307  WARN 20356 --- [ taskExecutor-3] o.s.b.f.support.DisposableBeanAdapter    : Destroy method 'close' on bean with name 'scopedTarget.writer2' threw an exception: java.lang.RuntimeException: Stream is already closed.
Reading: market6
Writter: /upload-demo/market6_2021-06-01.csv

这是我的批处理配置:

@EnableBatchProcessing
@Configuration
public class BatchConfig extends DefaultBatchConfigurer {

    String connectionString = "azureConnectionString";

    String containerName = "upload-demo";

    String endpoint = "azureHttpsEndpoint";

    String accountName ="azureAccountName";
    String accountKey = "accountKey";

    StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName, accountKey);
    BlobServiceClient client = new BlobServiceClientBuilder().connectionString(connectionString).endpoint(endpoint).buildClient();

    @Autowired
    private StepBuilderFactory steps;

    @Autowired
    private JobBuilderFactory jobs;

    @Autowired
    @Qualifier("verticaDb")
    private DataSource verticaDataSource;

    @Autowired
    private PlatformTransactionManager transactionManager;

    @Autowired
    private ConsoleItemWriter consoleItemWriter;

    @Autowired
    private ItemWriter itemWriter;

    @Bean
    public Job job() throws Exception {
        return jobs.get("job1")
                .start(masterStep(null, null))
                .incrementer(new RunIdIncrementer())
                .build();
    }

    @Bean
    public ThreadPoolTaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(5);
        taskExecutor.setMaxPoolSize(10);
        taskExecutor.initialize();
        return taskExecutor;
    }

    @Bean
    @JobScope
    public Step masterStep(@Value("#{jobParameters['startDate']}") String startDate,
                           @Value("#{jobParameters['endDate']}") String endDate) throws Exception {

        return steps.get("masterStep")
                .partitioner(slaveStep().getName(), new RangePartitioner(verticaDataSource, startDate, endDate))
                .step(slaveStep())
                .gridSize(5)
                .taskExecutor(taskExecutor())
                .build();
    }

    @Bean
    public Step slaveStep() throws Exception {
        return steps.get("slaveStep")
                .<MarketData, MarketData>chunk(100)
                .reader(pagingItemReader(null, null, null))
                .faultTolerant()
                .skip(NullPointerException.class)
                .skipPolicy(new AlwaysSkipItemSkipPolicy())
                .writer(writer2(null, null, null))  //consoleItemWriter
                .build();
    }

    @Bean
    @StepScope
    public JdbcPagingItemReader pagingItemReader(
            @Value("#{stepExecutionContext['MarketName']}") String marketName,
            @Value("#{jobParameters['startDate']}") String startDate,
            @Value("#{jobParameters['endDate']}") String endDate
            ) throws Exception {

System.out.println("Reading: " + marketName);

        SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean();

        Map<String, Order> sortKey = new HashMap<>();
        sortKey.put("xbin", Order.ASCENDING);
        sortKey.put("ybin", Order.ASCENDING);

        provider.setDataSource(this.verticaDataSource);
        provider.setDatabaseType("POSTGRES");
        provider.setSelectClause("SELECT MARKET AS market, EPSG AS epsg, XBIN AS xbin, YBIN AS ybin, " +
                        "LATITUDE AS latitude, LONGITUDE AS longitude, " +
                        "SUM(TOTALUPLINKVOLUME) AS totalDownlinkVol, SUM(TOTALDOWNLINKVOLUME) AS totalUplinkVol");
        provider.setFromClause("FROM views.geo_analytics");
        provider.setWhereClause(
                "WHERE market='" + marketName + "'" +
                        " AND STARTTIME >= '" + startDate + "'" +
                        " AND STARTTIME < '" + endDate + "'" +
                        " AND TOTALUPLINKVOLUME IS NOT NULL" +
                        " AND TOTALUPLINKVOLUME > 0" +
                        " AND TOTALDOWNLINKVOLUME IS NOT NULL" +
                        " AND TOTALDOWNLINKVOLUME > 0" +
                        " AND EPSG IS NOT NULL" +
                        " AND LATITUDE IS NOT NULL" +
                        " AND LONGITUDE IS NOT NULL" +
                        " AND XBIN IS NOT NULL" +
                        " AND YBIN IS NOT NULL"
        );
        provider.setGroupClause("GROUP BY XBIN, YBIN, MARKET, EPSG, LATITUDE, LONGITUDE");
        provider.setSortKeys(sortKey);

        JdbcPagingItemReader reader = new JdbcPagingItemReader();
        reader.setDataSource(this.verticaDataSource);
        reader.setQueryProvider(provider.getObject());
        reader.setFetchSize(1000);
        reader.setRowMapper(new BeanPropertyRowMapper() {
            {
                setMappedClass((MarketData.class));
            }
        });
        return reader;
    }

    @Bean
    @StepScope
    public FlatFileItemWriter<MarketData> writer2(@Value("#{jobParameters['yearMonth']}") String yearMonth,
                                                 @Value("#{stepExecutionContext['marketName']}") String marketName,
                                                 @Value("#{jobParameters['startDate']}") String startDate) throws URISyntaxException, InvalidKeyException, StorageException, IOException {

        AZBlobWriter<MarketData> writer = new AZBlobWriter<>();

        String fullPath =marketName + "_" + startDate + ".csv";
        String resourceString = "azure-blob://upload-demo/" + fullPath;

        CloudStorageAccount storageAccount = CloudStorageAccount.parse(connectionString);
        CloudBlobClient blobClient = storageAccount.createCloudBlobClient();
        CloudBlobContainer container2 = blobClient.getContainerReference(containerName);
        container2.createIfNotExists();

        AzureStorageResourcePatternResolver storageResourcePatternResolver = new AzureStorageResourcePatternResolver(client);
        Resource resource = storageResourcePatternResolver.getResource(resourceString);


System.out.println("Writter: " + resource.getURI().getPath().toString());

        writer.setResource(resource);
        writer.setStorage(container2);

        writer.setLineAggregator(new DelimitedLineAggregator<MarketData>() {
            {
                setDelimiter(",");
                setFieldExtractor(new BeanWrapperFieldExtractor<MarketData>() {
                    {
                        setNames(new String[] {
                                "market",
                                "epsg",
                                "xbin",
                                "ybin",
                                "latitude",
                                "longitude",
                                "totalDownlinkVol",
                                "totalUplinkVol"
                        });
                    }
                });
            }
        });
        return writer;
    }
}

之前我 运行 遇到过其他问题,例如将 FlatFileWriter 的资源设置为 Azure Blob,

按照@Mahmoud Ben Hassine 的建议,为 Azure Blob 实现 FlatFileWriter。

我以此为基础 (GCP) 的 FlatFileWriter 的实现 post:

这是 Azure Blob 的实现:

public class AZBlobWriter<T> extends FlatFileItemWriter<T> {

    private CloudBlobContainer storage;
    private Resource resource;

    private static final String DEFAULT_LINE_SEPARATOR = System.getProperty("line.separator");
    private OutputStream os;

    private String lineSeparator = DEFAULT_LINE_SEPARATOR;

    @Override
    public void write(List<? extends T> items) throws Exception {

        StringBuilder lines = new StringBuilder();
        for (T item : items) {
            lines.append(item).append(lineSeparator);
        }
        byte[] bytes = lines.toString().getBytes();
        try {
            os.write(bytes);
        }
        catch (IOException e) {
            throw new WriteFailedException("Could not write data.  The file may be corrupt.", e);
        }
        os.flush();
    }

    @Override
    public void open(ExecutionContext executionContext) {
        try {
            os = ((WritableResource)resource).getOutputStream();
            String bucket = resource.getURI().getHost();
            String filePath = resource.getURI().getPath().substring(1);

            CloudBlockBlob blob = storage.getBlockBlobReference(filePath);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (StorageException e) {
            e.printStackTrace();
        } catch (URISyntaxException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void update(ExecutionContext executionContext) {
    }

    @Override
    public void close() {
        super.close();

        try {
            os.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void setStorage(CloudBlobContainer storage) {
        this.storage = storage;
    }
    @Override
    public void setResource(Resource resource) {
        this.resource = resource;
    }
}

非常感谢任何帮助。对于“脏代码”,我深表歉意,因为我仍然 testing/developing 它。

谢谢,马库斯。

您没有共享整个堆栈跟踪以查看此错误发生的确切时间,但 close 方法似乎被调用了不止一次。我认为这不是由于并发问题,因为我看到您在分区步骤中为每个线程使用一个编写器。因此,我将通过在关闭输出流之前检查输出流是否已经关闭来使该方法“可重入”(输出流上没有 isClosed 方法,因此您可以在其周围使用自定义布尔值)。

也就是说,我会首先确认 close 方法被调用了两次,如果是,调查原因并修复根本原因。