如何在 spring 批处理和 spring 集成中从 itemWriter 写入输出通道?

how to write output channel from itemWriter in spring batch and spring integration?

首先感谢您的关注,我结合了spring批处理和spring集成,我定义了一个作业流程并从ftp适配器中检索文件并发送到jobChannel,并对其进行处理使用 spring 批处理,我想写入输出通道并在处理后使用通道,我的代码是:

     <int-ftp:outbound-gateway id="gatewayGET"
                                  local-directory-expression="'./backup/' +#remoteDirectory"
                                  session-factory="ftpSessionFactory"
                                  request-channel="toGetChannel"
                                  reply-channel="toProcessChannel"
                                  command="get"
                                  temporary-file-suffix=".writing"
                                  command-options="-P"
                                  expression="payload.remoteDirectory + '/' + payload.filename"/>
        <int:channel id="toProcessChannel">
            <int:interceptors>
                <int:wire-tap channel="logger2"/>
            </int:interceptors>
        </int:channel>
    <int:channel id="outboundJobRequestChannel"/>
    <int:channel id="replyJobChannel"/>
   <service-activator input-channel="jobLaunchReplyChannel"/>
 <int:transformer input-channel="toProcessChannel" output-channel="outboundJobRequestChannel">
        <bean class="ir.isc.macna.configuration.FileMessageToJobRequest">
            <property name="fileParameterName" value="fileName"/>
        </bean>
    </int:transformer>
    <batch-int:job-launching-gateway request-channel="outboundJobRequestChannel"
                                     reply-channel="jobLaunchReplyChannel"/>

我的作者代码是:

@Component
@StepScope
public class MacnaFileWriter implements ChunkMessageChannelItemWriter<FileInfo> {

    @Autowired
    @Qualifier("replyJobChannel")
    private MessageChannel messageChannel;
    @Override
    public void write(List<? extends FileInfo> list) throws Exception {

       // send Message to replyJobChannel channel with Send method
    }
}

并使用ftp适配器在服务器上写入文件:

<int-ftp:outbound-gateway session-factory="ftpSessionFactory"
                              request-channel="replyJob"
                              command="mput"
                              auto-create-directory="true"
                              expression="payload"
                              remote-directory-expression="payload.remoteDirectory + '/' + payload.filename + '.out'"
                              reply-channel="output"/>

这是 运行 批处理作业和使用结果的标准方法吗?

MessageChannelsend 方法。为此,您可以使用 MessageBuilder 为您的 payload.

创建一个 Message<?>

因为它只是一个 <channel> 与任何其他 MessageChannel 消费没有区别。像这样定义消费者的标准方式:

<service-activator input-channel="jobLaunchReplyChannel"/>

然而,当此频道特定于来自 <batch-int:job-launching-gateway> 的结果(JobLaunchingGateway class):

jobExecution = this.jobLaunchingMessageHandler.launch(jobLaunchRequest);

因此,jobExecution 被发送到 jobLaunchReplyChannel

请参阅 Spring Batch Integration 文档。

也许您需要 AsyncItemWriterChunkMessageChannelItemWriter