Spring 启动批处理 - MultiResourceItemReader:出错时移至下一个文件
Spring Boot batch - MultiResourceItemReader : move to next file on error
在批处理服务中,我使用委托给 StaxEventItemReader 的 MultiResourceItemReader 读取多个 XML 文件。
如果在读取文件时出现错误(例如解析异常),我想指定 Spring 开始读取下一个匹配的文件。使用@OnReadError 注释 and/or 例如 SkipPolicy。
目前,当读取异常抛出时,批处理停止。
有人知道怎么做吗?
编辑:我看到 MultiResourceItemReader 有一个方法 readNextItem(),但它是私有的 -_-
我有一段时间没有使用 SB,但正在寻找 MultiResourceItemReader
代码 我想你可以编写自己的 ResourceAwareItemReaderItemStream
包装器,你可以在其中检查设置为移动到下一个文件或使用委托执行标准读取。
此标志可以存储到执行上下文或包装器中,并且应该在下一步移动后清除。
class MoveNextReader<T> implements ResourceAwareItemReaderItemStream<T> {
private ResourceAwareItemReaderItemStream delegate;
private boolean skipThisFile = false;
public void setSkipThisFile(boolean value) {
skipThisFile = value;
}
public void setResource(Resource resource) {
skipThisFile = false;
delegate.setResource(resource);
}
public T read() {
if(skipThisFile) {
skipThisFile = false;
// This force MultiResourceItemReader to move to next resource
return null;
}
return delegate.read();
}
}
使用此 class 作为 MultiResourceItemReader
的委托,并在 @OnReadError
中注入 MoveNextReader
并设置 MoveNextReader.skipThisFile
.
我无法自己测试代码,但我希望这是一个很好的起点。
这是我最后的 类 读取多个 XML 文件并在一个文件发生读取错误时跳转到下一个文件(感谢 Luca 的想法)。
我的自定义 ItemReader,从 MultiResourceItemReader 扩展:
public class MyItemReader extends MultiResourceItemReader<InputElement> {
private SkippableResourceItemReader<InputElement> reader;
public MyItemReader() throws IOException {
super();
// Resources
PathMatchingResourcePatternResolver resourceResolver = new PathMatchingResourcePatternResolver();
this.setResources( resourceResolver.getResources( "classpath:input/inputFile*.xml" ) );
// Delegate reader
reader = new SkippableResourceItemReader<InputElement>();
StaxEventItemReader<InputElement> delegateReader = new StaxEventItemReader<InputElement>();
delegateReader.setFragmentRootElementName("inputElement");
Jaxb2Marshaller unmarshaller = new Jaxb2Marshaller();
unmarshaller.setClassesToBeBound( InputElement.class );
delegateReader.setUnmarshaller( unmarshaller );
reader.setDelegate( delegateReader );
this.setDelegate( reader );
}
[...]
@OnReadError
public void onReadError( Exception exception ){
reader.setSkipResource( true );
}
}
中间的 ItemReader 用于跳过当前资源:
public class SkippableResourceItemReader<T> implements ResourceAwareItemReaderItemStream<T> {
private ResourceAwareItemReaderItemStream<T> delegate;
private boolean skipResource = false;
@Override
public void close() throws ItemStreamException {
delegate.close();
}
@Override
public T read() throws UnexpectedInputException, ParseException, NonTransientResourceException, Exception {
if( skipResource ){
skipResource = false;
return null;
}
return delegate.read();
}
@Override
public void setResource( Resource resource ) {
skipResource = false;
delegate.setResource( resource );
}
@Override
public void open( ExecutionContext executionContext ) throws ItemStreamException {
delegate.open( executionContext );
}
@Override
public void update( ExecutionContext executionContext ) throws ItemStreamException {
delegate.update( executionContext );
}
public void setDelegate(ResourceAwareItemReaderItemStream<T> delegate) {
this.delegate = delegate;
}
public void setSkipResource( boolean skipResource ) {
this.skipResource = skipResource;
}
}
在批处理服务中,我使用委托给 StaxEventItemReader 的 MultiResourceItemReader 读取多个 XML 文件。
如果在读取文件时出现错误(例如解析异常),我想指定 Spring 开始读取下一个匹配的文件。使用@OnReadError 注释 and/or 例如 SkipPolicy。
目前,当读取异常抛出时,批处理停止。
有人知道怎么做吗?
编辑:我看到 MultiResourceItemReader 有一个方法 readNextItem(),但它是私有的 -_-
我有一段时间没有使用 SB,但正在寻找 MultiResourceItemReader
代码 我想你可以编写自己的 ResourceAwareItemReaderItemStream
包装器,你可以在其中检查设置为移动到下一个文件或使用委托执行标准读取。
此标志可以存储到执行上下文或包装器中,并且应该在下一步移动后清除。
class MoveNextReader<T> implements ResourceAwareItemReaderItemStream<T> {
private ResourceAwareItemReaderItemStream delegate;
private boolean skipThisFile = false;
public void setSkipThisFile(boolean value) {
skipThisFile = value;
}
public void setResource(Resource resource) {
skipThisFile = false;
delegate.setResource(resource);
}
public T read() {
if(skipThisFile) {
skipThisFile = false;
// This force MultiResourceItemReader to move to next resource
return null;
}
return delegate.read();
}
}
使用此 class 作为 MultiResourceItemReader
的委托,并在 @OnReadError
中注入 MoveNextReader
并设置 MoveNextReader.skipThisFile
.
我无法自己测试代码,但我希望这是一个很好的起点。
这是我最后的 类 读取多个 XML 文件并在一个文件发生读取错误时跳转到下一个文件(感谢 Luca 的想法)。
我的自定义 ItemReader,从 MultiResourceItemReader 扩展:
public class MyItemReader extends MultiResourceItemReader<InputElement> {
private SkippableResourceItemReader<InputElement> reader;
public MyItemReader() throws IOException {
super();
// Resources
PathMatchingResourcePatternResolver resourceResolver = new PathMatchingResourcePatternResolver();
this.setResources( resourceResolver.getResources( "classpath:input/inputFile*.xml" ) );
// Delegate reader
reader = new SkippableResourceItemReader<InputElement>();
StaxEventItemReader<InputElement> delegateReader = new StaxEventItemReader<InputElement>();
delegateReader.setFragmentRootElementName("inputElement");
Jaxb2Marshaller unmarshaller = new Jaxb2Marshaller();
unmarshaller.setClassesToBeBound( InputElement.class );
delegateReader.setUnmarshaller( unmarshaller );
reader.setDelegate( delegateReader );
this.setDelegate( reader );
}
[...]
@OnReadError
public void onReadError( Exception exception ){
reader.setSkipResource( true );
}
}
中间的 ItemReader 用于跳过当前资源:
public class SkippableResourceItemReader<T> implements ResourceAwareItemReaderItemStream<T> {
private ResourceAwareItemReaderItemStream<T> delegate;
private boolean skipResource = false;
@Override
public void close() throws ItemStreamException {
delegate.close();
}
@Override
public T read() throws UnexpectedInputException, ParseException, NonTransientResourceException, Exception {
if( skipResource ){
skipResource = false;
return null;
}
return delegate.read();
}
@Override
public void setResource( Resource resource ) {
skipResource = false;
delegate.setResource( resource );
}
@Override
public void open( ExecutionContext executionContext ) throws ItemStreamException {
delegate.open( executionContext );
}
@Override
public void update( ExecutionContext executionContext ) throws ItemStreamException {
delegate.update( executionContext );
}
public void setDelegate(ResourceAwareItemReaderItemStream<T> delegate) {
this.delegate = delegate;
}
public void setSkipResource( boolean skipResource ) {
this.skipResource = skipResource;
}
}