如何在处理大型 csv 文件时从 spring 批处理中的下一个文件开始 disable/avoid linesToSkp(1)
How to disable/avoid linesToSkp(1) from next file onwards in spring batch while processing large csv file
我们有包含 1 亿条记录的大型 csv 文件,并使用“SystemCommandTasklet”通过拆分包含 100 万条记录的文件,使用 spring 批量加载、读取和写入数据库。以下是片段,
@Bean
@StepScope
public SystemCommandTasklet splitFileTasklet(@Value("#{jobParameters[filePath]}") final String inputFilePath) {
SystemCommandTasklet tasklet = new SystemCommandTasklet();
final File file = BatchUtilities.prefixFile(inputFilePath, AppConstants.PROCESSING_PREFIX);
final String command = configProperties.getBatch().getDataLoadPrep().getSplitCommand() + " " + file.getAbsolutePath() + " " + configProperties.getBatch().getDataLoad().getInputLocation() + System.currentTimeMillis() / 1000;
tasklet.setCommand(command);
tasklet.setTimeout(configProperties.getBatch().getDataLoadPrep().getSplitCommandTimeout());
executionContext.put(AppConstants.FILE_PATH_PARAM, file.getPath());
return tasklet;
}
和batch-config:
batch:
data-load-prep:
input-location: /mnt/mlr/prep/
split-command: split -l 1000000 --additional-suffix=.csv
split-command-timeout: 900000 # 15 min
schedule: "*/60 * * * * *"
lock-at-most: 5m
通过上面的配置,我可以成功地读取负载并写入数据库。但是,在下面的代码片段中发现了一个错误,在拆分文件后,只有第一个文件会有 headers,但下一个拆分文件的第一行没有听众。因此,我必须禁用或避免 FlatFileItemReader(CSVReader) 的 linesToSkip(1) 配置。
@Configuration
public class DataLoadReader {
@Bean
@StepScope
public FlatFileItemReader<DemographicData> demographicDataCSVReader(@Value("#{jobExecutionContext[filePath]}") final String filePath) {
return new FlatFileItemReaderBuilder<DemographicData>()
.name("data-load-csv-reader")
.resource(new FileSystemResource(filePath))
.linesToSkip(1) // Need to avoid this from 2nd splitted file onwards as splitted file does not have headers
.lineMapper(lineMapper())
.build();
}
public LineMapper<DemographicData> lineMapper() {
DefaultLineMapper<DemographicData> defaultLineMapper = new DefaultLineMapper<>();
DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();
lineTokenizer.setNames("id", "mdl65DecileNum", "mdl66DecileNum", "hhId", "dob", "firstName", "middleName",
"lastName", "addressLine1", "addressLine2", "cityName", "stdCode", "zipCode", "zipp4Code", "fipsCntyCd",
"fipsStCd", "langName", "regionName", "fipsCntyName", "estimatedIncome");
defaultLineMapper.setLineTokenizer(lineTokenizer);
defaultLineMapper.setFieldSetMapper(new DemographicDataFieldSetMapper());
return defaultLineMapper;
}
}
注意:加载程序在加载时不应跳过第二个文件的第一行。
提前谢谢你。感谢任何建议。
我会在 SystemCommandTasklet
中使用以下命令执行此操作:
tail -n +2 data.csv | split -l 1000000 --additional-suffix=.csv
如果您真的想在 Spring 批处理作业中使用 Java 来完成,您可以使用自定义 reader 或过滤 header 的项目处理器.但我 不 推荐这种方法,因为它为每个项目引入了额外的测试(考虑到输入文件中的大量行,这可能会影响工作的性能)。
我们有包含 1 亿条记录的大型 csv 文件,并使用“SystemCommandTasklet”通过拆分包含 100 万条记录的文件,使用 spring 批量加载、读取和写入数据库。以下是片段,
@Bean
@StepScope
public SystemCommandTasklet splitFileTasklet(@Value("#{jobParameters[filePath]}") final String inputFilePath) {
SystemCommandTasklet tasklet = new SystemCommandTasklet();
final File file = BatchUtilities.prefixFile(inputFilePath, AppConstants.PROCESSING_PREFIX);
final String command = configProperties.getBatch().getDataLoadPrep().getSplitCommand() + " " + file.getAbsolutePath() + " " + configProperties.getBatch().getDataLoad().getInputLocation() + System.currentTimeMillis() / 1000;
tasklet.setCommand(command);
tasklet.setTimeout(configProperties.getBatch().getDataLoadPrep().getSplitCommandTimeout());
executionContext.put(AppConstants.FILE_PATH_PARAM, file.getPath());
return tasklet;
}
和batch-config:
batch:
data-load-prep:
input-location: /mnt/mlr/prep/
split-command: split -l 1000000 --additional-suffix=.csv
split-command-timeout: 900000 # 15 min
schedule: "*/60 * * * * *"
lock-at-most: 5m
通过上面的配置,我可以成功地读取负载并写入数据库。但是,在下面的代码片段中发现了一个错误,在拆分文件后,只有第一个文件会有 headers,但下一个拆分文件的第一行没有听众。因此,我必须禁用或避免 FlatFileItemReader(CSVReader) 的 linesToSkip(1) 配置。
@Configuration
public class DataLoadReader {
@Bean
@StepScope
public FlatFileItemReader<DemographicData> demographicDataCSVReader(@Value("#{jobExecutionContext[filePath]}") final String filePath) {
return new FlatFileItemReaderBuilder<DemographicData>()
.name("data-load-csv-reader")
.resource(new FileSystemResource(filePath))
.linesToSkip(1) // Need to avoid this from 2nd splitted file onwards as splitted file does not have headers
.lineMapper(lineMapper())
.build();
}
public LineMapper<DemographicData> lineMapper() {
DefaultLineMapper<DemographicData> defaultLineMapper = new DefaultLineMapper<>();
DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();
lineTokenizer.setNames("id", "mdl65DecileNum", "mdl66DecileNum", "hhId", "dob", "firstName", "middleName",
"lastName", "addressLine1", "addressLine2", "cityName", "stdCode", "zipCode", "zipp4Code", "fipsCntyCd",
"fipsStCd", "langName", "regionName", "fipsCntyName", "estimatedIncome");
defaultLineMapper.setLineTokenizer(lineTokenizer);
defaultLineMapper.setFieldSetMapper(new DemographicDataFieldSetMapper());
return defaultLineMapper;
}
}
注意:加载程序在加载时不应跳过第二个文件的第一行。
提前谢谢你。感谢任何建议。
我会在 SystemCommandTasklet
中使用以下命令执行此操作:
tail -n +2 data.csv | split -l 1000000 --additional-suffix=.csv
如果您真的想在 Spring 批处理作业中使用 Java 来完成,您可以使用自定义 reader 或过滤 header 的项目处理器.但我 不 推荐这种方法,因为它为每个项目引入了额外的测试(考虑到输入文件中的大量行,这可能会影响工作的性能)。