Spring 批处理 - FlatFileItemWriter 错误 14416:流已关闭
Spring Batch - FlatFileItemWriter Error 14416: Stream is already closed
基本上我有一个 Spring 查询数据库并实现分区程序以获取作业的批处理,并将作业分配给 SlaveStep 中的 ThreadPoolTaskExecutors。
Reader 从数据库中读取(作业)。 Writer 将数据加载到 Azure Blob 存储中的 csv 文件中。
Job Partitioner 和 Reader 工作正常。 Writer 写入一个文件,然后关闭,其他作业无法完成,因为流已关闭。我收到以下错误:
Reading: market1
Reading: market2
Reading: market3
Reading: market4
Reading: market5
Writter: /upload-demo/market3_2021-06-01.csv
Writter: /upload-demo/market5_2021-06-01.csv
Writter: /upload-demo/market4_63_2021-06-01.csv
Writter: /upload-demo/market2_2021-06-01.csv
Writter: /upload-demo/market1_11_2021-06-01.csv
2021-06-02 08:24:42.304 ERROR 20356 --- [ taskExecutor-3] c.a.storage.common.StorageOutputStream : Stream is already closed.
2021-06-02 08:24:42.307 WARN 20356 --- [ taskExecutor-3] o.s.b.f.support.DisposableBeanAdapter : Destroy method 'close' on bean with name 'scopedTarget.writer2' threw an exception: java.lang.RuntimeException: Stream is already closed.
Reading: market6
Writter: /upload-demo/market6_2021-06-01.csv
这是我的批处理配置:
@EnableBatchProcessing
@Configuration
public class BatchConfig extends DefaultBatchConfigurer {
String connectionString = "azureConnectionString";
String containerName = "upload-demo";
String endpoint = "azureHttpsEndpoint";
String accountName ="azureAccountName";
String accountKey = "accountKey";
StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName, accountKey);
BlobServiceClient client = new BlobServiceClientBuilder().connectionString(connectionString).endpoint(endpoint).buildClient();
@Autowired
private StepBuilderFactory steps;
@Autowired
private JobBuilderFactory jobs;
@Autowired
@Qualifier("verticaDb")
private DataSource verticaDataSource;
@Autowired
private PlatformTransactionManager transactionManager;
@Autowired
private ConsoleItemWriter consoleItemWriter;
@Autowired
private ItemWriter itemWriter;
@Bean
public Job job() throws Exception {
return jobs.get("job1")
.start(masterStep(null, null))
.incrementer(new RunIdIncrementer())
.build();
}
@Bean
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(5);
taskExecutor.setMaxPoolSize(10);
taskExecutor.initialize();
return taskExecutor;
}
@Bean
@JobScope
public Step masterStep(@Value("#{jobParameters['startDate']}") String startDate,
@Value("#{jobParameters['endDate']}") String endDate) throws Exception {
return steps.get("masterStep")
.partitioner(slaveStep().getName(), new RangePartitioner(verticaDataSource, startDate, endDate))
.step(slaveStep())
.gridSize(5)
.taskExecutor(taskExecutor())
.build();
}
@Bean
public Step slaveStep() throws Exception {
return steps.get("slaveStep")
.<MarketData, MarketData>chunk(100)
.reader(pagingItemReader(null, null, null))
.faultTolerant()
.skip(NullPointerException.class)
.skipPolicy(new AlwaysSkipItemSkipPolicy())
.writer(writer2(null, null, null)) //consoleItemWriter
.build();
}
@Bean
@StepScope
public JdbcPagingItemReader pagingItemReader(
@Value("#{stepExecutionContext['MarketName']}") String marketName,
@Value("#{jobParameters['startDate']}") String startDate,
@Value("#{jobParameters['endDate']}") String endDate
) throws Exception {
System.out.println("Reading: " + marketName);
SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean();
Map<String, Order> sortKey = new HashMap<>();
sortKey.put("xbin", Order.ASCENDING);
sortKey.put("ybin", Order.ASCENDING);
provider.setDataSource(this.verticaDataSource);
provider.setDatabaseType("POSTGRES");
provider.setSelectClause("SELECT MARKET AS market, EPSG AS epsg, XBIN AS xbin, YBIN AS ybin, " +
"LATITUDE AS latitude, LONGITUDE AS longitude, " +
"SUM(TOTALUPLINKVOLUME) AS totalDownlinkVol, SUM(TOTALDOWNLINKVOLUME) AS totalUplinkVol");
provider.setFromClause("FROM views.geo_analytics");
provider.setWhereClause(
"WHERE market='" + marketName + "'" +
" AND STARTTIME >= '" + startDate + "'" +
" AND STARTTIME < '" + endDate + "'" +
" AND TOTALUPLINKVOLUME IS NOT NULL" +
" AND TOTALUPLINKVOLUME > 0" +
" AND TOTALDOWNLINKVOLUME IS NOT NULL" +
" AND TOTALDOWNLINKVOLUME > 0" +
" AND EPSG IS NOT NULL" +
" AND LATITUDE IS NOT NULL" +
" AND LONGITUDE IS NOT NULL" +
" AND XBIN IS NOT NULL" +
" AND YBIN IS NOT NULL"
);
provider.setGroupClause("GROUP BY XBIN, YBIN, MARKET, EPSG, LATITUDE, LONGITUDE");
provider.setSortKeys(sortKey);
JdbcPagingItemReader reader = new JdbcPagingItemReader();
reader.setDataSource(this.verticaDataSource);
reader.setQueryProvider(provider.getObject());
reader.setFetchSize(1000);
reader.setRowMapper(new BeanPropertyRowMapper() {
{
setMappedClass((MarketData.class));
}
});
return reader;
}
@Bean
@StepScope
public FlatFileItemWriter<MarketData> writer2(@Value("#{jobParameters['yearMonth']}") String yearMonth,
@Value("#{stepExecutionContext['marketName']}") String marketName,
@Value("#{jobParameters['startDate']}") String startDate) throws URISyntaxException, InvalidKeyException, StorageException, IOException {
AZBlobWriter<MarketData> writer = new AZBlobWriter<>();
String fullPath =marketName + "_" + startDate + ".csv";
String resourceString = "azure-blob://upload-demo/" + fullPath;
CloudStorageAccount storageAccount = CloudStorageAccount.parse(connectionString);
CloudBlobClient blobClient = storageAccount.createCloudBlobClient();
CloudBlobContainer container2 = blobClient.getContainerReference(containerName);
container2.createIfNotExists();
AzureStorageResourcePatternResolver storageResourcePatternResolver = new AzureStorageResourcePatternResolver(client);
Resource resource = storageResourcePatternResolver.getResource(resourceString);
System.out.println("Writter: " + resource.getURI().getPath().toString());
writer.setResource(resource);
writer.setStorage(container2);
writer.setLineAggregator(new DelimitedLineAggregator<MarketData>() {
{
setDelimiter(",");
setFieldExtractor(new BeanWrapperFieldExtractor<MarketData>() {
{
setNames(new String[] {
"market",
"epsg",
"xbin",
"ybin",
"latitude",
"longitude",
"totalDownlinkVol",
"totalUplinkVol"
});
}
});
}
});
return writer;
}
}
之前我 运行 遇到过其他问题,例如将 FlatFileWriter 的资源设置为 Azure Blob,。
按照@Mahmoud Ben Hassine 的建议,为 Azure Blob 实现 FlatFileWriter。
我以此为基础 (GCP) 的 FlatFileWriter 的实现 post:
这是 Azure Blob 的实现:
public class AZBlobWriter<T> extends FlatFileItemWriter<T> {
private CloudBlobContainer storage;
private Resource resource;
private static final String DEFAULT_LINE_SEPARATOR = System.getProperty("line.separator");
private OutputStream os;
private String lineSeparator = DEFAULT_LINE_SEPARATOR;
@Override
public void write(List<? extends T> items) throws Exception {
StringBuilder lines = new StringBuilder();
for (T item : items) {
lines.append(item).append(lineSeparator);
}
byte[] bytes = lines.toString().getBytes();
try {
os.write(bytes);
}
catch (IOException e) {
throw new WriteFailedException("Could not write data. The file may be corrupt.", e);
}
os.flush();
}
@Override
public void open(ExecutionContext executionContext) {
try {
os = ((WritableResource)resource).getOutputStream();
String bucket = resource.getURI().getHost();
String filePath = resource.getURI().getPath().substring(1);
CloudBlockBlob blob = storage.getBlockBlobReference(filePath);
} catch (IOException e) {
e.printStackTrace();
} catch (StorageException e) {
e.printStackTrace();
} catch (URISyntaxException e) {
e.printStackTrace();
}
}
@Override
public void update(ExecutionContext executionContext) {
}
@Override
public void close() {
super.close();
try {
os.close();
} catch (IOException e) {
e.printStackTrace();
}
}
public void setStorage(CloudBlobContainer storage) {
this.storage = storage;
}
@Override
public void setResource(Resource resource) {
this.resource = resource;
}
}
非常感谢任何帮助。对于“脏代码”,我深表歉意,因为我仍然 testing/developing 它。
谢谢,马库斯。
您没有共享整个堆栈跟踪以查看此错误发生的确切时间,但 close
方法似乎被调用了不止一次。我认为这不是由于并发问题,因为我看到您在分区步骤中为每个线程使用一个编写器。因此,我将通过在关闭输出流之前检查输出流是否已经关闭来使该方法“可重入”(输出流上没有 isClosed
方法,因此您可以在其周围使用自定义布尔值)。
也就是说,我会首先确认 close
方法被调用了两次,如果是,调查原因并修复根本原因。
基本上我有一个 Spring 查询数据库并实现分区程序以获取作业的批处理,并将作业分配给 SlaveStep 中的 ThreadPoolTaskExecutors。
Reader 从数据库中读取(作业)。 Writer 将数据加载到 Azure Blob 存储中的 csv 文件中。
Job Partitioner 和 Reader 工作正常。 Writer 写入一个文件,然后关闭,其他作业无法完成,因为流已关闭。我收到以下错误:
Reading: market1
Reading: market2
Reading: market3
Reading: market4
Reading: market5
Writter: /upload-demo/market3_2021-06-01.csv
Writter: /upload-demo/market5_2021-06-01.csv
Writter: /upload-demo/market4_63_2021-06-01.csv
Writter: /upload-demo/market2_2021-06-01.csv
Writter: /upload-demo/market1_11_2021-06-01.csv
2021-06-02 08:24:42.304 ERROR 20356 --- [ taskExecutor-3] c.a.storage.common.StorageOutputStream : Stream is already closed.
2021-06-02 08:24:42.307 WARN 20356 --- [ taskExecutor-3] o.s.b.f.support.DisposableBeanAdapter : Destroy method 'close' on bean with name 'scopedTarget.writer2' threw an exception: java.lang.RuntimeException: Stream is already closed.
Reading: market6
Writter: /upload-demo/market6_2021-06-01.csv
这是我的批处理配置:
@EnableBatchProcessing
@Configuration
public class BatchConfig extends DefaultBatchConfigurer {
String connectionString = "azureConnectionString";
String containerName = "upload-demo";
String endpoint = "azureHttpsEndpoint";
String accountName ="azureAccountName";
String accountKey = "accountKey";
StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName, accountKey);
BlobServiceClient client = new BlobServiceClientBuilder().connectionString(connectionString).endpoint(endpoint).buildClient();
@Autowired
private StepBuilderFactory steps;
@Autowired
private JobBuilderFactory jobs;
@Autowired
@Qualifier("verticaDb")
private DataSource verticaDataSource;
@Autowired
private PlatformTransactionManager transactionManager;
@Autowired
private ConsoleItemWriter consoleItemWriter;
@Autowired
private ItemWriter itemWriter;
@Bean
public Job job() throws Exception {
return jobs.get("job1")
.start(masterStep(null, null))
.incrementer(new RunIdIncrementer())
.build();
}
@Bean
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(5);
taskExecutor.setMaxPoolSize(10);
taskExecutor.initialize();
return taskExecutor;
}
@Bean
@JobScope
public Step masterStep(@Value("#{jobParameters['startDate']}") String startDate,
@Value("#{jobParameters['endDate']}") String endDate) throws Exception {
return steps.get("masterStep")
.partitioner(slaveStep().getName(), new RangePartitioner(verticaDataSource, startDate, endDate))
.step(slaveStep())
.gridSize(5)
.taskExecutor(taskExecutor())
.build();
}
@Bean
public Step slaveStep() throws Exception {
return steps.get("slaveStep")
.<MarketData, MarketData>chunk(100)
.reader(pagingItemReader(null, null, null))
.faultTolerant()
.skip(NullPointerException.class)
.skipPolicy(new AlwaysSkipItemSkipPolicy())
.writer(writer2(null, null, null)) //consoleItemWriter
.build();
}
@Bean
@StepScope
public JdbcPagingItemReader pagingItemReader(
@Value("#{stepExecutionContext['MarketName']}") String marketName,
@Value("#{jobParameters['startDate']}") String startDate,
@Value("#{jobParameters['endDate']}") String endDate
) throws Exception {
System.out.println("Reading: " + marketName);
SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean();
Map<String, Order> sortKey = new HashMap<>();
sortKey.put("xbin", Order.ASCENDING);
sortKey.put("ybin", Order.ASCENDING);
provider.setDataSource(this.verticaDataSource);
provider.setDatabaseType("POSTGRES");
provider.setSelectClause("SELECT MARKET AS market, EPSG AS epsg, XBIN AS xbin, YBIN AS ybin, " +
"LATITUDE AS latitude, LONGITUDE AS longitude, " +
"SUM(TOTALUPLINKVOLUME) AS totalDownlinkVol, SUM(TOTALDOWNLINKVOLUME) AS totalUplinkVol");
provider.setFromClause("FROM views.geo_analytics");
provider.setWhereClause(
"WHERE market='" + marketName + "'" +
" AND STARTTIME >= '" + startDate + "'" +
" AND STARTTIME < '" + endDate + "'" +
" AND TOTALUPLINKVOLUME IS NOT NULL" +
" AND TOTALUPLINKVOLUME > 0" +
" AND TOTALDOWNLINKVOLUME IS NOT NULL" +
" AND TOTALDOWNLINKVOLUME > 0" +
" AND EPSG IS NOT NULL" +
" AND LATITUDE IS NOT NULL" +
" AND LONGITUDE IS NOT NULL" +
" AND XBIN IS NOT NULL" +
" AND YBIN IS NOT NULL"
);
provider.setGroupClause("GROUP BY XBIN, YBIN, MARKET, EPSG, LATITUDE, LONGITUDE");
provider.setSortKeys(sortKey);
JdbcPagingItemReader reader = new JdbcPagingItemReader();
reader.setDataSource(this.verticaDataSource);
reader.setQueryProvider(provider.getObject());
reader.setFetchSize(1000);
reader.setRowMapper(new BeanPropertyRowMapper() {
{
setMappedClass((MarketData.class));
}
});
return reader;
}
@Bean
@StepScope
public FlatFileItemWriter<MarketData> writer2(@Value("#{jobParameters['yearMonth']}") String yearMonth,
@Value("#{stepExecutionContext['marketName']}") String marketName,
@Value("#{jobParameters['startDate']}") String startDate) throws URISyntaxException, InvalidKeyException, StorageException, IOException {
AZBlobWriter<MarketData> writer = new AZBlobWriter<>();
String fullPath =marketName + "_" + startDate + ".csv";
String resourceString = "azure-blob://upload-demo/" + fullPath;
CloudStorageAccount storageAccount = CloudStorageAccount.parse(connectionString);
CloudBlobClient blobClient = storageAccount.createCloudBlobClient();
CloudBlobContainer container2 = blobClient.getContainerReference(containerName);
container2.createIfNotExists();
AzureStorageResourcePatternResolver storageResourcePatternResolver = new AzureStorageResourcePatternResolver(client);
Resource resource = storageResourcePatternResolver.getResource(resourceString);
System.out.println("Writter: " + resource.getURI().getPath().toString());
writer.setResource(resource);
writer.setStorage(container2);
writer.setLineAggregator(new DelimitedLineAggregator<MarketData>() {
{
setDelimiter(",");
setFieldExtractor(new BeanWrapperFieldExtractor<MarketData>() {
{
setNames(new String[] {
"market",
"epsg",
"xbin",
"ybin",
"latitude",
"longitude",
"totalDownlinkVol",
"totalUplinkVol"
});
}
});
}
});
return writer;
}
}
之前我 运行 遇到过其他问题,例如将 FlatFileWriter 的资源设置为 Azure Blob,
按照@Mahmoud Ben Hassine 的建议,为 Azure Blob 实现 FlatFileWriter。
我以此为基础 (GCP) 的 FlatFileWriter 的实现 post:
这是 Azure Blob 的实现:
public class AZBlobWriter<T> extends FlatFileItemWriter<T> {
private CloudBlobContainer storage;
private Resource resource;
private static final String DEFAULT_LINE_SEPARATOR = System.getProperty("line.separator");
private OutputStream os;
private String lineSeparator = DEFAULT_LINE_SEPARATOR;
@Override
public void write(List<? extends T> items) throws Exception {
StringBuilder lines = new StringBuilder();
for (T item : items) {
lines.append(item).append(lineSeparator);
}
byte[] bytes = lines.toString().getBytes();
try {
os.write(bytes);
}
catch (IOException e) {
throw new WriteFailedException("Could not write data. The file may be corrupt.", e);
}
os.flush();
}
@Override
public void open(ExecutionContext executionContext) {
try {
os = ((WritableResource)resource).getOutputStream();
String bucket = resource.getURI().getHost();
String filePath = resource.getURI().getPath().substring(1);
CloudBlockBlob blob = storage.getBlockBlobReference(filePath);
} catch (IOException e) {
e.printStackTrace();
} catch (StorageException e) {
e.printStackTrace();
} catch (URISyntaxException e) {
e.printStackTrace();
}
}
@Override
public void update(ExecutionContext executionContext) {
}
@Override
public void close() {
super.close();
try {
os.close();
} catch (IOException e) {
e.printStackTrace();
}
}
public void setStorage(CloudBlobContainer storage) {
this.storage = storage;
}
@Override
public void setResource(Resource resource) {
this.resource = resource;
}
}
非常感谢任何帮助。对于“脏代码”,我深表歉意,因为我仍然 testing/developing 它。
谢谢,马库斯。
您没有共享整个堆栈跟踪以查看此错误发生的确切时间,但 close
方法似乎被调用了不止一次。我认为这不是由于并发问题,因为我看到您在分区步骤中为每个线程使用一个编写器。因此,我将通过在关闭输出流之前检查输出流是否已经关闭来使该方法“可重入”(输出流上没有 isClosed
方法,因此您可以在其周围使用自定义布尔值)。
也就是说,我会首先确认 close
方法被调用了两次,如果是,调查原因并修复根本原因。