Univocity bean 处理器在分布式系统中表现出不一致的行为
Univocity bean processor showing inconsistent behaviour in distributed system
我正在使用 univocity bean 处理器进行文件解析。我能够在本地盒子上成功使用它。但是在具有多个主机的环境中部署相同的代码时,解析器表现出不一致的行为。对于无效文件,它不会处理失败,对于有效文件,它有时也会处理失败。
想知道 bean 处理器实现是否适合多线程分布式环境。
示例代码:
private void validateFile(@Nonnull final File inputFile) throws NonRetriableException {
try {
final BeanProcessor<TargetingInputBean> rowProcessor = new BeanProcessor<TargetingInputBean>(
TargetingInputBean.class) {
@Override
public void beanProcessed(@Nonnull final TargetingInputBean targetingInputBean,
@Nonnull final ParsingContext context) {
final String customerId = targetingInputBean.getCustomerId();
final String segmentId = targetingInputBean.getSegmentId();
log.debug("Validating customerId {} segmentId {} for {} file", customerId, segmentId, inputFile
.getAbsolutePath());
if (StringUtils.isBlank(customerId) || StringUtils.isBlank(segmentId)) {
throw new DataProcessingException("customerId or segmentId is blank");
}
try {
someValidation(customerId);
} catch (IllegalArgumentException ex) {
throw new DataProcessingException(
String.format("customerId %s is not in required format. Exception"
+ " message %s", customerId, ex.getMessage()),
ex);
}
}
};
rowProcessor.setStrictHeaderValidationEnabled(true);
final CsvParser parser = new CsvParser(getCSVParserSettings(rowProcessor));
parser.parse(inputFile);
} catch (TextParsingException ex) {
throw new NonRetriableException(
String.format("Exception=%s occurred while getting & parsing targeting file "
+ "contents, error=%s", ex.getClass(), ex.getMessage()),
ex);
}
}
private CsvParserSettings getCSVParserSettings(@Nonnull final BeanProcessor<TargetingInputBean> rowProcessor) {
final CsvParserSettings parserSettings = new CsvParserSettings();
parserSettings.setProcessor(rowProcessor);
parserSettings.setHeaderExtractionEnabled(true);
parserSettings.getFormat().setDelimiter(AIRCubeTargetingFileConstants.FILE_SEPARATOR);
return parserSettings;
}
TargetingInputBean:
public class TargetingInputBean {
@Parsed(field = "CustomerId")
private String customerId;
@Parsed(field = "SegmentId")
private String segmentId;
}
您使用的是最新版本吗?
我刚刚意识到您可能受到了 2.5.0 版中引入的错误的影响,如果我没记错的话,该错误已在 2.5.6 版中修复。这困扰了我一段时间,因为这是一个很难追踪的内部并发问题。基本上当你传递一个没有显式编码的文件时,它会尝试在输入中找到一个 UTF BOM 标记(有效地消耗第一个字符)来自动确定编码。这仅发生在 InputStreams 和文件中。
无论如何,这个问题已经修复,所以只需更新到最新版本就可以解决您的问题(如果您使用的不是版本 2,请告诉我。5.something)
如果您想保留现有的当前版本,调用
错误将消失
parser.parse(inputFile, Charset.defaultCharset());
这将阻止解析器尝试发现您的文件中是否有 BOM 标记,从而避免该讨厌的错误。
希望这对您有所帮助
我正在使用 univocity bean 处理器进行文件解析。我能够在本地盒子上成功使用它。但是在具有多个主机的环境中部署相同的代码时,解析器表现出不一致的行为。对于无效文件,它不会处理失败,对于有效文件,它有时也会处理失败。
想知道 bean 处理器实现是否适合多线程分布式环境。
示例代码:
private void validateFile(@Nonnull final File inputFile) throws NonRetriableException {
try {
final BeanProcessor<TargetingInputBean> rowProcessor = new BeanProcessor<TargetingInputBean>(
TargetingInputBean.class) {
@Override
public void beanProcessed(@Nonnull final TargetingInputBean targetingInputBean,
@Nonnull final ParsingContext context) {
final String customerId = targetingInputBean.getCustomerId();
final String segmentId = targetingInputBean.getSegmentId();
log.debug("Validating customerId {} segmentId {} for {} file", customerId, segmentId, inputFile
.getAbsolutePath());
if (StringUtils.isBlank(customerId) || StringUtils.isBlank(segmentId)) {
throw new DataProcessingException("customerId or segmentId is blank");
}
try {
someValidation(customerId);
} catch (IllegalArgumentException ex) {
throw new DataProcessingException(
String.format("customerId %s is not in required format. Exception"
+ " message %s", customerId, ex.getMessage()),
ex);
}
}
};
rowProcessor.setStrictHeaderValidationEnabled(true);
final CsvParser parser = new CsvParser(getCSVParserSettings(rowProcessor));
parser.parse(inputFile);
} catch (TextParsingException ex) {
throw new NonRetriableException(
String.format("Exception=%s occurred while getting & parsing targeting file "
+ "contents, error=%s", ex.getClass(), ex.getMessage()),
ex);
}
}
private CsvParserSettings getCSVParserSettings(@Nonnull final BeanProcessor<TargetingInputBean> rowProcessor) {
final CsvParserSettings parserSettings = new CsvParserSettings();
parserSettings.setProcessor(rowProcessor);
parserSettings.setHeaderExtractionEnabled(true);
parserSettings.getFormat().setDelimiter(AIRCubeTargetingFileConstants.FILE_SEPARATOR);
return parserSettings;
}
TargetingInputBean:
public class TargetingInputBean {
@Parsed(field = "CustomerId")
private String customerId;
@Parsed(field = "SegmentId")
private String segmentId;
}
您使用的是最新版本吗?
我刚刚意识到您可能受到了 2.5.0 版中引入的错误的影响,如果我没记错的话,该错误已在 2.5.6 版中修复。这困扰了我一段时间,因为这是一个很难追踪的内部并发问题。基本上当你传递一个没有显式编码的文件时,它会尝试在输入中找到一个 UTF BOM 标记(有效地消耗第一个字符)来自动确定编码。这仅发生在 InputStreams 和文件中。
无论如何,这个问题已经修复,所以只需更新到最新版本就可以解决您的问题(如果您使用的不是版本 2,请告诉我。5.something)
如果您想保留现有的当前版本,调用
错误将消失parser.parse(inputFile, Charset.defaultCharset());
这将阻止解析器尝试发现您的文件中是否有 BOM 标记,从而避免该讨厌的错误。
希望这对您有所帮助