设置 CompletionPolicy 以提交整个文件
set CompletionPolicy to commit entire file
我正在创建一个简单的批处理作业,它读取多个文件并将每个文件的内容作为单独的消息发布到队列中。这是我的代码:-
@Bean
@Qualifier("step1")
public Step step1() {
ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
try {
String files = outputDirectory+"*.txt";
resources = resolver.getResources("file:"+files);
}
catch (Exception e) {
logger.error("Error in the step1",e);
}
customCompletionPolicy.setReader(readerPeek());
return stepBuilderFactory.get("step1")
.<String, String> chunk(customCompletionPolicy)
.reader(multiResourceItemReader())
.writer(messageItemWriter)
.build();
}
@Bean
@StepScope
public MultiResourceItemReader<String> multiResourceItemReader() {
MultiResourceItemReader<String> resourceItemReader = new MultiResourceItemReader<String>();
resourceItemReader.setResources(resources);
resourceItemReader.setDelegate(reader);
return resourceItemReader;
}
@Bean
@StepScope
@Qualifier("reader")
public FlatFileItemReader<String> reader() {
FlatFileItemReader<String> reader = new FlatFileItemReader<>();
reader.setLineMapper(new PassThroughLineMapper());
reader.setRecordSeparatorPolicy(new SimpleRecordSeparatorPolicy(){
@Override
public boolean isEndOfRecord(String line){
return super.isEndOfRecord(line);
}
});
return reader;
}
@Bean
@Qualifier("readerPeek")
public SingleItemPeekableItemReader<String> readerPeek() {
SingleItemPeekableItemReader<String> reader = new SingleItemPeekableItemReader<>();
reader.setDelegate(reader());
return reader;
}
我正在尝试为此创建一个 CustomCompletionPolicy。这是 class :-
@Component
public class CustomCompletionPolicy extends CompletionPolicySupport {
private EOFCompletionContext cc;
private PeekableItemReader<String> reader;
public void setReader(PeekableItemReader<String> forseeingReader)
{
this.reader = forseeingReader;
}
@Override
public boolean isComplete(RepeatContext context)
{
return this.cc.isComplete();
}
@Override
public RepeatContext start(RepeatContext context)
{
this.cc = new EOFCompletionContext(context);
return cc;
}
@Override
public void update(RepeatContext context)
{
}
protected class EOFCompletionContext extends RepeatContextSupport
{
boolean endOfFile = false;
public EOFCompletionContext (RepeatContext context)
{
super(context);
}
public boolean isComplete() {
final String next ;
try{
System.out.println("Here I am in update");
next = reader.peek();
} catch (Exception e) {
throw new NonTransientResourceException("Unable to peek", e);
}
if (null == next){
return true;
} else {
return false;
}
}
}
}
这是 MessageItemWriter 的代码:-
@Component
public class MessageItemWriter 扩展 JmsItemWriter{
private static final Logger logger = LoggerFactory.getLogger(MessageItemWriter.class);
@Autowired
@Qualifier("jmsTemplate")
private JmsTemplate jmsTemplate;
@Autowired
private ConfigProps configProps;
private int counter = 0;
@Override
public void write(final List<? extends String> items) throws Exception {
logger.info("In the write "+items+" "+items.size());
//addDelay();
simpleSend(items);
logger.info("completed Posting");
}
public void simpleSend(final List<? extends String> items) {
this.jmsTemplate.send(solaceJMSProps.getQueueName(), new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
int mgsId = ++counter;
TextMessage msg = session.createTextMessage();
String msgContent = items.stream().filter(Objects::nonNull).collect(Collectors.joining(TransactionFileReaderConstants.COMMA));
msg.setText(msgContent);
msg.setIntProperty(TransactionFileReaderConstants.MESSAGE_ID, mgsId);
msg.setJMSTimestamp(System.currentTimeMillis());
logger.info("mgsId"+mgsId);
msg.setJMSMessageID(String.valueOf(mgsId));
logger.info("about to post: "+msg.getJMSMessageID());
return msg;
}
});
}
private void addDelay() throws InterruptedException {
TimeUnit.SECONDS.sleep(configProps.getPauseTime());
}
}
我面临的问题是,当我 运行 这个时,我得到一个错误:
错误org.springframework.batch.core.step.AbstractStep - 在作业 midasFileReaderJob 中执行步骤 processData 时遇到错误
java.lang.WhosebugError: 空
在 org.springframework.batch.item.support.SingleItemPeekableItemReader.update(SingleItemPeekableItemReader.java:145) ~[Transactions-Filereader-Batch-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
在 org.springframework.batch.item.support.SingleItemPeekableItemReader.updateDelegate(SingleItemPeekableItemReader.java:158) ~[Transactions-Filereader-Batch-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
有人能帮我理解为什么我调用 peek() 时会出现这个错误吗?我该如何解决?
谢谢,
希马尼
我深入研究了 MultiResourceItemReader
的源代码(版本 4.2.1),我认为如果不对当前代码进行一些破解,就无法轻松实现。主要是因为当 MultiResourceItemReader
切换到下一个 Resource 读取时,它并没有将这个切换事件暴露给外界,这样其他对象就无法检测到这个事件并做出相应的反应。所有关于检测此事件的状态也是私有的,这意味着即使通过扩展 MultiResourceItemReader
实现自定义 reader,我们仍然无法访问这些信息。
因此,使用PeekableItemReader
实现自定义CompletionPolicy
也不起作用,主要是因为从PeekableItemReader
的角度来看,它仍然不知道什么时候切换事件发生。通过将所有资源视为单个资源,它只知道是否仍有项目可从基础 MultiResourceItemReader
中查看,它不知道是否有可用项目可从当前和单个资源中查看。
我建议你向 spring-batch 官方 github 申请此功能。
或者,您可以更改为使用 MultiResourcePartitioner
按照 this 的建议为每个文件创建一个分区步骤。
由于您将文件本身作为消息发送,我认为没有必要逐行读取每个文件然后在编写器中重新组合这些行。因此,MultiResourceItemReader
方法是不合适的。
我会将一个项目定义为一个文件,并使用自定义 ItemReader<File>
和 JmsItemWriter<File>
。这种做法是:
- 更有效率
- 分区友好
- 并解决了尝试创建自定义完成策略来为整个文件提交事务的问题。
我正在创建一个简单的批处理作业,它读取多个文件并将每个文件的内容作为单独的消息发布到队列中。这是我的代码:-
@Bean
@Qualifier("step1")
public Step step1() {
ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
try {
String files = outputDirectory+"*.txt";
resources = resolver.getResources("file:"+files);
}
catch (Exception e) {
logger.error("Error in the step1",e);
}
customCompletionPolicy.setReader(readerPeek());
return stepBuilderFactory.get("step1")
.<String, String> chunk(customCompletionPolicy)
.reader(multiResourceItemReader())
.writer(messageItemWriter)
.build();
}
@Bean
@StepScope
public MultiResourceItemReader<String> multiResourceItemReader() {
MultiResourceItemReader<String> resourceItemReader = new MultiResourceItemReader<String>();
resourceItemReader.setResources(resources);
resourceItemReader.setDelegate(reader);
return resourceItemReader;
}
@Bean
@StepScope
@Qualifier("reader")
public FlatFileItemReader<String> reader() {
FlatFileItemReader<String> reader = new FlatFileItemReader<>();
reader.setLineMapper(new PassThroughLineMapper());
reader.setRecordSeparatorPolicy(new SimpleRecordSeparatorPolicy(){
@Override
public boolean isEndOfRecord(String line){
return super.isEndOfRecord(line);
}
});
return reader;
}
@Bean
@Qualifier("readerPeek")
public SingleItemPeekableItemReader<String> readerPeek() {
SingleItemPeekableItemReader<String> reader = new SingleItemPeekableItemReader<>();
reader.setDelegate(reader());
return reader;
}
我正在尝试为此创建一个 CustomCompletionPolicy。这是 class :-
@Component
public class CustomCompletionPolicy extends CompletionPolicySupport {
private EOFCompletionContext cc;
private PeekableItemReader<String> reader;
public void setReader(PeekableItemReader<String> forseeingReader)
{
this.reader = forseeingReader;
}
@Override
public boolean isComplete(RepeatContext context)
{
return this.cc.isComplete();
}
@Override
public RepeatContext start(RepeatContext context)
{
this.cc = new EOFCompletionContext(context);
return cc;
}
@Override
public void update(RepeatContext context)
{
}
protected class EOFCompletionContext extends RepeatContextSupport
{
boolean endOfFile = false;
public EOFCompletionContext (RepeatContext context)
{
super(context);
}
public boolean isComplete() {
final String next ;
try{
System.out.println("Here I am in update");
next = reader.peek();
} catch (Exception e) {
throw new NonTransientResourceException("Unable to peek", e);
}
if (null == next){
return true;
} else {
return false;
}
}
}
}
这是 MessageItemWriter 的代码:-
@Component
public class MessageItemWriter 扩展 JmsItemWriter{
private static final Logger logger = LoggerFactory.getLogger(MessageItemWriter.class);
@Autowired
@Qualifier("jmsTemplate")
private JmsTemplate jmsTemplate;
@Autowired
private ConfigProps configProps;
private int counter = 0;
@Override
public void write(final List<? extends String> items) throws Exception {
logger.info("In the write "+items+" "+items.size());
//addDelay();
simpleSend(items);
logger.info("completed Posting");
}
public void simpleSend(final List<? extends String> items) {
this.jmsTemplate.send(solaceJMSProps.getQueueName(), new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
int mgsId = ++counter;
TextMessage msg = session.createTextMessage();
String msgContent = items.stream().filter(Objects::nonNull).collect(Collectors.joining(TransactionFileReaderConstants.COMMA));
msg.setText(msgContent);
msg.setIntProperty(TransactionFileReaderConstants.MESSAGE_ID, mgsId);
msg.setJMSTimestamp(System.currentTimeMillis());
logger.info("mgsId"+mgsId);
msg.setJMSMessageID(String.valueOf(mgsId));
logger.info("about to post: "+msg.getJMSMessageID());
return msg;
}
});
}
private void addDelay() throws InterruptedException {
TimeUnit.SECONDS.sleep(configProps.getPauseTime());
}
}
我面临的问题是,当我 运行 这个时,我得到一个错误:
错误org.springframework.batch.core.step.AbstractStep - 在作业 midasFileReaderJob 中执行步骤 processData 时遇到错误 java.lang.WhosebugError: 空 在 org.springframework.batch.item.support.SingleItemPeekableItemReader.update(SingleItemPeekableItemReader.java:145) ~[Transactions-Filereader-Batch-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?] 在 org.springframework.batch.item.support.SingleItemPeekableItemReader.updateDelegate(SingleItemPeekableItemReader.java:158) ~[Transactions-Filereader-Batch-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
有人能帮我理解为什么我调用 peek() 时会出现这个错误吗?我该如何解决?
谢谢, 希马尼
我深入研究了 MultiResourceItemReader
的源代码(版本 4.2.1),我认为如果不对当前代码进行一些破解,就无法轻松实现。主要是因为当 MultiResourceItemReader
切换到下一个 Resource 读取时,它并没有将这个切换事件暴露给外界,这样其他对象就无法检测到这个事件并做出相应的反应。所有关于检测此事件的状态也是私有的,这意味着即使通过扩展 MultiResourceItemReader
实现自定义 reader,我们仍然无法访问这些信息。
因此,使用PeekableItemReader
实现自定义CompletionPolicy
也不起作用,主要是因为从PeekableItemReader
的角度来看,它仍然不知道什么时候切换事件发生。通过将所有资源视为单个资源,它只知道是否仍有项目可从基础 MultiResourceItemReader
中查看,它不知道是否有可用项目可从当前和单个资源中查看。
我建议你向 spring-batch 官方 github 申请此功能。
或者,您可以更改为使用 MultiResourcePartitioner
按照 this 的建议为每个文件创建一个分区步骤。
由于您将文件本身作为消息发送,我认为没有必要逐行读取每个文件然后在编写器中重新组合这些行。因此,MultiResourceItemReader
方法是不合适的。
我会将一个项目定义为一个文件,并使用自定义 ItemReader<File>
和 JmsItemWriter<File>
。这种做法是:
- 更有效率
- 分区友好
- 并解决了尝试创建自定义完成策略来为整个文件提交事务的问题。