Spring 动态块大小的批量自定义完成策略
Spring Batch custom completion policy for dynamic chunk size
上下文
我们有一个批处理作业,可以将本地化的国家/地区名称(即将国家/地区名称翻译成不同语言)从外部数据库复制到我们的数据库中。这个想法是在一个块中处理一个国家的所有本地化国家名称(即第一个块 - 安道尔的所有翻译,下一个块 - U.A.E 的所有翻译,等等)。我们使用 JdbcCursorItemReader
来读取外部数据 + 一些 oracle 分析函数来提供该国家/地区可用的翻译总数:类似于
select country_code, language_code, localized_name, COUNT(1) OVER(PARTITION BY c_lng.country_code) as lng_count
from EXT_COUNTRY_LNG c_lng
order by c_lng.countty_code, c_lng.language_code
问题
所以按块切割这个输入看起来很简单:当你读取了 lng_count
中指定的确切行数时停止块,并从下一个读取行开始一个新的块,但它似乎不是实际上如此简单:(
首先要尝试的是自定义完成策略。但问题是,它无权访问最后一项,由 ItemReader
读取 - 您应该明确地将其放入 reader 的上下文中,然后将其取回策略中。不喜欢它,因为它需要额外的 reader modifications/adding reader 听众。此外,我不喜欢同一个项目来回 serialized/deserialized。而且我觉得 JobContext
/StepContext
不是存放此类数据的好地方。
还有 RepeatContext
看起来更适合存放此类数据,但我没能轻松 ...
所以最后我们得到这样的解决方案:
@Bean(name = "localizedCountryNamesStep")
@JobScope
public Step insertCountryStep(
final StepBuilderFactory stepBuilderFactory,
final MasterdataCountryNameReader countryNameReader,
final MasterdataCountryNameProcessor countryNameProcessor,
final MasterdataCountryNameWriter writer) {
/* Use the same fixed-commit policy, but update it's chunk size dynamically */
final SimpleCompletionPolicy policy = new SimpleCompletionPolicy();
return stepBuilderFactory.get("localizedCountryNamesStep")
.<ExtCountryLng, LocalizedCountryName> chunk(policy)
.reader(countryNameReader)
.listener(new ItemReadListener<ExtCountryLng>() {
@Override
public void beforeRead() {
// do nothing
}
@Override
public void afterRead(final ExtCountryLng item) {
/* Update the cunk size after every read: consequent reads
inside the same country = same chunk do nothing since lngCount is always the same there */
policy.setChunkSize(item.getLngCount());
}
@Override
public void onReadError(final Exception ex) {
// do nothing
}
})
.processor(countryNameProcessor)
.writer(writer)
.faultTolerant()
.skip(RuntimeException.class)
.skipLimit(Integer.MAX_VALUE) // Batch does not support unlimited skip
.retryLimit(0) // this solution disables only retry, but not recover
.build();
}
它工作正常,需要最少的代码更改,但对我来说还是有点难看。所以我想知道,当 ItemReader
中已经提供了所有必需的信息时,是否还有另一种优雅的方法可以在 Spring 批处理中执行动态块大小?
最简单的方法是简单地按国家/地区划分步骤。这样每个国家都会有自己的步骤,你也可以跨国家线程以提高性能。
如果需要单个 reader,您可以包装一个委托 PeekableItemReader
并扩展 SimpleCompletionPolicy
来实现您的目标。
public class CountryPeekingCompletionPolicyReader extends SimpleCompletionPolicy implements ItemReader<CountrySpecificItem> {
private PeekableItemReader<? extends CountrySpecificItem> delegate;
private CountrySpecificItem currentReadItem = null;
@Override
public CountrySpecificItem read() throws UnexpectedInputException, ParseException, NonTransientResourceException, Exception {
currentReadItem = delegate.read();
return currentReadItem;
}
@Override
public RepeatContext start(final RepeatContext context) {
return new ComparisonPolicyTerminationContext(context);
}
protected class ComparisonPolicyTerminationContext extends SimpleTerminationContext {
public ComparisonPolicyTerminationContext(final RepeatContext context) {
super(context);
}
@Override
public boolean isComplete() {
final CountrySpecificItem nextReadItem = delegate.peek();
// logic to check if same country
if (currentReadItem.isSameCountry(nextReadItem)) {
return false;
}
return true;
}
}
}
那么在你的上下文中你会定义:
<batch:tasklet>
<batch:chunk chunk-completion-policy="countrySpecificCompletionPolicy" reader="countrySpecificCompletionPolicy" writer="someWriter" />
</batch:tasklet>
<bean id="countrySpecificCompletionPolicy" class="CountryPeekingCompletionPolicyReader">
<property name="delegate" ref="peekableReader" />
</bean>
<bean id="peekableReader" class="YourPeekableItemReader" />
编辑: 回想你的问题,分区让我觉得是最干净的方法。使用 partitioned step,每个 ItemReader(确保 scope="step"
)将从步骤执行上下文传递一个 countryName
。是的,您需要一个自定义 Partitioner
class 来构建您的执行上下文图(每个国家一个条目)和一个足够大的硬编码提交间隔以容纳您最大的工作单元,但是在那之后一切都是非常样板化的,并且由于每个从属步骤只是一个单独的块,对于任何可能遇到问题的国家来说,重启应该是相对容易的。
上下文
我们有一个批处理作业,可以将本地化的国家/地区名称(即将国家/地区名称翻译成不同语言)从外部数据库复制到我们的数据库中。这个想法是在一个块中处理一个国家的所有本地化国家名称(即第一个块 - 安道尔的所有翻译,下一个块 - U.A.E 的所有翻译,等等)。我们使用 JdbcCursorItemReader
来读取外部数据 + 一些 oracle 分析函数来提供该国家/地区可用的翻译总数:类似于
select country_code, language_code, localized_name, COUNT(1) OVER(PARTITION BY c_lng.country_code) as lng_count
from EXT_COUNTRY_LNG c_lng
order by c_lng.countty_code, c_lng.language_code
问题
所以按块切割这个输入看起来很简单:当你读取了 lng_count
中指定的确切行数时停止块,并从下一个读取行开始一个新的块,但它似乎不是实际上如此简单:(
首先要尝试的是自定义完成策略。但问题是,它无权访问最后一项,由 ItemReader
读取 - 您应该明确地将其放入 reader 的上下文中,然后将其取回策略中。不喜欢它,因为它需要额外的 reader modifications/adding reader 听众。此外,我不喜欢同一个项目来回 serialized/deserialized。而且我觉得 JobContext
/StepContext
不是存放此类数据的好地方。
还有 RepeatContext
看起来更适合存放此类数据,但我没能轻松 ...
所以最后我们得到这样的解决方案:
@Bean(name = "localizedCountryNamesStep")
@JobScope
public Step insertCountryStep(
final StepBuilderFactory stepBuilderFactory,
final MasterdataCountryNameReader countryNameReader,
final MasterdataCountryNameProcessor countryNameProcessor,
final MasterdataCountryNameWriter writer) {
/* Use the same fixed-commit policy, but update it's chunk size dynamically */
final SimpleCompletionPolicy policy = new SimpleCompletionPolicy();
return stepBuilderFactory.get("localizedCountryNamesStep")
.<ExtCountryLng, LocalizedCountryName> chunk(policy)
.reader(countryNameReader)
.listener(new ItemReadListener<ExtCountryLng>() {
@Override
public void beforeRead() {
// do nothing
}
@Override
public void afterRead(final ExtCountryLng item) {
/* Update the cunk size after every read: consequent reads
inside the same country = same chunk do nothing since lngCount is always the same there */
policy.setChunkSize(item.getLngCount());
}
@Override
public void onReadError(final Exception ex) {
// do nothing
}
})
.processor(countryNameProcessor)
.writer(writer)
.faultTolerant()
.skip(RuntimeException.class)
.skipLimit(Integer.MAX_VALUE) // Batch does not support unlimited skip
.retryLimit(0) // this solution disables only retry, but not recover
.build();
}
它工作正常,需要最少的代码更改,但对我来说还是有点难看。所以我想知道,当 ItemReader
中已经提供了所有必需的信息时,是否还有另一种优雅的方法可以在 Spring 批处理中执行动态块大小?
最简单的方法是简单地按国家/地区划分步骤。这样每个国家都会有自己的步骤,你也可以跨国家线程以提高性能。
如果需要单个 reader,您可以包装一个委托 PeekableItemReader
并扩展 SimpleCompletionPolicy
来实现您的目标。
public class CountryPeekingCompletionPolicyReader extends SimpleCompletionPolicy implements ItemReader<CountrySpecificItem> {
private PeekableItemReader<? extends CountrySpecificItem> delegate;
private CountrySpecificItem currentReadItem = null;
@Override
public CountrySpecificItem read() throws UnexpectedInputException, ParseException, NonTransientResourceException, Exception {
currentReadItem = delegate.read();
return currentReadItem;
}
@Override
public RepeatContext start(final RepeatContext context) {
return new ComparisonPolicyTerminationContext(context);
}
protected class ComparisonPolicyTerminationContext extends SimpleTerminationContext {
public ComparisonPolicyTerminationContext(final RepeatContext context) {
super(context);
}
@Override
public boolean isComplete() {
final CountrySpecificItem nextReadItem = delegate.peek();
// logic to check if same country
if (currentReadItem.isSameCountry(nextReadItem)) {
return false;
}
return true;
}
}
}
那么在你的上下文中你会定义:
<batch:tasklet>
<batch:chunk chunk-completion-policy="countrySpecificCompletionPolicy" reader="countrySpecificCompletionPolicy" writer="someWriter" />
</batch:tasklet>
<bean id="countrySpecificCompletionPolicy" class="CountryPeekingCompletionPolicyReader">
<property name="delegate" ref="peekableReader" />
</bean>
<bean id="peekableReader" class="YourPeekableItemReader" />
编辑: 回想你的问题,分区让我觉得是最干净的方法。使用 partitioned step,每个 ItemReader(确保 scope="step"
)将从步骤执行上下文传递一个 countryName
。是的,您需要一个自定义 Partitioner
class 来构建您的执行上下文图(每个国家一个条目)和一个足够大的硬编码提交间隔以容纳您最大的工作单元,但是在那之后一切都是非常样板化的,并且由于每个从属步骤只是一个单独的块,对于任何可能遇到问题的国家来说,重启应该是相对容易的。