设置 CompletionPolicy 以提交整个文件

set CompletionPolicy to commit entire file

我正在创建一个简单的批处理作业,它读取多个文件并将每个文件的内容作为单独的消息发布到队列中。这是我的代码:-

    @Bean
    @Qualifier("step1")
    public Step step1() {
        ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
        try {
            String files = outputDirectory+"*.txt";
            resources = resolver.getResources("file:"+files);
        }
        catch (Exception e) {
            logger.error("Error in the step1",e);
        }  
        customCompletionPolicy.setReader(readerPeek());
        return stepBuilderFactory.get("step1")
        .<String, String> chunk(customCompletionPolicy)
        .reader(multiResourceItemReader())             
        .writer(messageItemWriter)         
        .build();
    }


    @Bean
    @StepScope
    public MultiResourceItemReader<String> multiResourceItemReader() {
        MultiResourceItemReader<String> resourceItemReader = new MultiResourceItemReader<String>();
        resourceItemReader.setResources(resources);
        resourceItemReader.setDelegate(reader);
        return resourceItemReader;
    }


    @Bean
    @StepScope
    @Qualifier("reader")
    public FlatFileItemReader<String> reader() {    
        FlatFileItemReader<String> reader = new FlatFileItemReader<>();
        reader.setLineMapper(new PassThroughLineMapper());
        reader.setRecordSeparatorPolicy(new SimpleRecordSeparatorPolicy(){
            @Override
            public boolean isEndOfRecord(String line){
                return super.isEndOfRecord(line);
            }
        });
        return reader;
    }

    @Bean
    @Qualifier("readerPeek")
    public SingleItemPeekableItemReader<String> readerPeek() {
        SingleItemPeekableItemReader<String> reader = new SingleItemPeekableItemReader<>();
        reader.setDelegate(reader());
        return reader;
    }

我正在尝试为此创建一个 CustomCompletionPolicy。这是 class :-

    @Component
public class CustomCompletionPolicy extends CompletionPolicySupport {

    private EOFCompletionContext cc;
    private PeekableItemReader<String> reader;

    public void setReader(PeekableItemReader<String> forseeingReader)
    {
        this.reader = forseeingReader;
    }

    @Override
    public boolean isComplete(RepeatContext context)
    {
        return this.cc.isComplete();
    }

    @Override
    public RepeatContext start(RepeatContext context)
    {
        this.cc = new EOFCompletionContext(context);
        return cc;
    }

    @Override
    public void update(RepeatContext context)
    {
    }

    protected class EOFCompletionContext extends RepeatContextSupport
    {
        boolean endOfFile = false;
        public EOFCompletionContext (RepeatContext context)
        {
            super(context);
        }

            public boolean isComplete() {
            final String next ;
            try{
                System.out.println("Here I am in update");
                next = reader.peek();
            } catch (Exception e) {
                throw new NonTransientResourceException("Unable to peek", e);
            }
            if (null == next){
                return true;
            } else {
                return false;
            }
        }
    }
}

这是 MessageItemWriter 的代码:-

@Component

public class MessageItemWriter 扩展 JmsItemWriter{

private static final  Logger logger = LoggerFactory.getLogger(MessageItemWriter.class);

@Autowired
@Qualifier("jmsTemplate")
private JmsTemplate jmsTemplate;

@Autowired
private ConfigProps configProps;    


private int counter = 0;


@Override
public void write(final List<? extends String> items) throws Exception {
    logger.info("In the write "+items+"  "+items.size());
    //addDelay();
    simpleSend(items);
    logger.info("completed Posting");
}

public void simpleSend(final List<? extends String> items) {
    this.jmsTemplate.send(solaceJMSProps.getQueueName(), new MessageCreator() {
        public Message createMessage(Session session) throws JMSException {
            int mgsId = ++counter;
            TextMessage msg = session.createTextMessage();
            String msgContent = items.stream().filter(Objects::nonNull).collect(Collectors.joining(TransactionFileReaderConstants.COMMA));
            msg.setText(msgContent);
            msg.setIntProperty(TransactionFileReaderConstants.MESSAGE_ID, mgsId);
            msg.setJMSTimestamp(System.currentTimeMillis());
            logger.info("mgsId"+mgsId);
            msg.setJMSMessageID(String.valueOf(mgsId));
            logger.info("about to post: "+msg.getJMSMessageID());
            return msg;
        }
    });
}

private void addDelay() throws InterruptedException {
    TimeUnit.SECONDS.sleep(configProps.getPauseTime());
}

}

我面临的问题是,当我 运行 这个时,我得到一个错误:

错误org.springframework.batch.core.step.AbstractStep - 在作业 midasFileReaderJob 中执行步骤 processData 时遇到错误 java.lang.WhosebugError: 空 在 org.springframework.batch.item.support.SingleItemPeekableItemReader.update(SingleItemPeekableItemReader.java:145) ~[Transactions-Filereader-Batch-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?] 在 org.springframework.batch.item.support.SingleItemPeekableItemReader.updateDelegate(SingleItemPeekableItemReader.java:158) ~[Transactions-Filereader-Batch-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]

有人能帮我理解为什么我调用 peek() 时会出现这个错误吗?我该如何解决?

谢谢, 希马尼

我深入研究了 MultiResourceItemReader 的源代码(版本 4.2.1),我认为如果不对当前代码进行一些破解,就无法轻松实现。主要是因为当 MultiResourceItemReader 切换到下一个 Resource 读取时,它并没有将这个切换事件暴露给外界,这样其他对象就无法检测到这个事件并做出相应的反应。所有关于检测此事件的状态也是私有的,这意味着即使通过扩展 MultiResourceItemReader 实现自定义 reader,我们仍然无法访问这些信息。

因此,使用PeekableItemReader实现自定义CompletionPolicy也不起作用,主要是因为从PeekableItemReader的角度来看,它仍然不知道什么时候切换事件发生。通过将所有资源视为单个资源,它只知道是否仍有项目可从基础 MultiResourceItemReader 中查看,它不知道是否有可用项目可从当前和单个资源中查看。

我建议你向 spring-batch 官方 github 申请此功能。

或者,您可以更改为使用 MultiResourcePartitioner 按照 this 的建议为每个文件创建一个分区步骤。

由于您将文件本身作为消息发送,我认为没有必要逐行读取每个文件然后在编写器中重新组合这些行。因此,MultiResourceItemReader 方法是不合适的。

我会将一个项目定义为一个文件,并使用自定义 ItemReader<File>JmsItemWriter<File>。这种做法是:

  • 更有效率
  • 分区友好
  • 并解决了尝试创建自定义完成策略来为整个文件提交事务的问题。