如何在 spring 批处理和 spring 集成中从 itemWriter 写入输出通道?
how to write output channel from itemWriter in spring batch and spring integration?
首先感谢您的关注,我结合了spring批处理和spring集成,我定义了一个作业流程并从ftp适配器中检索文件并发送到jobChannel,并对其进行处理使用 spring 批处理,我想写入输出通道并在处理后使用通道,我的代码是:
<int-ftp:outbound-gateway id="gatewayGET"
local-directory-expression="'./backup/' +#remoteDirectory"
session-factory="ftpSessionFactory"
request-channel="toGetChannel"
reply-channel="toProcessChannel"
command="get"
temporary-file-suffix=".writing"
command-options="-P"
expression="payload.remoteDirectory + '/' + payload.filename"/>
<int:channel id="toProcessChannel">
<int:interceptors>
<int:wire-tap channel="logger2"/>
</int:interceptors>
</int:channel>
<int:channel id="outboundJobRequestChannel"/>
<int:channel id="replyJobChannel"/>
<service-activator input-channel="jobLaunchReplyChannel"/>
<int:transformer input-channel="toProcessChannel" output-channel="outboundJobRequestChannel">
<bean class="ir.isc.macna.configuration.FileMessageToJobRequest">
<property name="fileParameterName" value="fileName"/>
</bean>
</int:transformer>
<batch-int:job-launching-gateway request-channel="outboundJobRequestChannel"
reply-channel="jobLaunchReplyChannel"/>
我的作者代码是:
@Component
@StepScope
public class MacnaFileWriter implements ChunkMessageChannelItemWriter<FileInfo> {
@Autowired
@Qualifier("replyJobChannel")
private MessageChannel messageChannel;
@Override
public void write(List<? extends FileInfo> list) throws Exception {
// send Message to replyJobChannel channel with Send method
}
}
并使用ftp适配器在服务器上写入文件:
<int-ftp:outbound-gateway session-factory="ftpSessionFactory"
request-channel="replyJob"
command="mput"
auto-create-directory="true"
expression="payload"
remote-directory-expression="payload.remoteDirectory + '/' + payload.filename + '.out'"
reply-channel="output"/>
这是 运行 批处理作业和使用结果的标准方法吗?
MessageChannel
有 send
方法。为此,您可以使用 MessageBuilder
为您的 payload
.
创建一个 Message<?>
因为它只是一个 <channel>
与任何其他 MessageChannel
消费没有区别。像这样定义消费者的标准方式:
<service-activator input-channel="jobLaunchReplyChannel"/>
然而,当此频道特定于来自 <batch-int:job-launching-gateway>
的结果(JobLaunchingGateway
class):
jobExecution = this.jobLaunchingMessageHandler.launch(jobLaunchRequest);
因此,jobExecution
被发送到 jobLaunchReplyChannel
。
请参阅 Spring Batch Integration 文档。
也许您需要 AsyncItemWriter
或 ChunkMessageChannelItemWriter
?
首先感谢您的关注,我结合了spring批处理和spring集成,我定义了一个作业流程并从ftp适配器中检索文件并发送到jobChannel,并对其进行处理使用 spring 批处理,我想写入输出通道并在处理后使用通道,我的代码是:
<int-ftp:outbound-gateway id="gatewayGET"
local-directory-expression="'./backup/' +#remoteDirectory"
session-factory="ftpSessionFactory"
request-channel="toGetChannel"
reply-channel="toProcessChannel"
command="get"
temporary-file-suffix=".writing"
command-options="-P"
expression="payload.remoteDirectory + '/' + payload.filename"/>
<int:channel id="toProcessChannel">
<int:interceptors>
<int:wire-tap channel="logger2"/>
</int:interceptors>
</int:channel>
<int:channel id="outboundJobRequestChannel"/>
<int:channel id="replyJobChannel"/>
<service-activator input-channel="jobLaunchReplyChannel"/>
<int:transformer input-channel="toProcessChannel" output-channel="outboundJobRequestChannel">
<bean class="ir.isc.macna.configuration.FileMessageToJobRequest">
<property name="fileParameterName" value="fileName"/>
</bean>
</int:transformer>
<batch-int:job-launching-gateway request-channel="outboundJobRequestChannel"
reply-channel="jobLaunchReplyChannel"/>
我的作者代码是:
@Component
@StepScope
public class MacnaFileWriter implements ChunkMessageChannelItemWriter<FileInfo> {
@Autowired
@Qualifier("replyJobChannel")
private MessageChannel messageChannel;
@Override
public void write(List<? extends FileInfo> list) throws Exception {
// send Message to replyJobChannel channel with Send method
}
}
并使用ftp适配器在服务器上写入文件:
<int-ftp:outbound-gateway session-factory="ftpSessionFactory"
request-channel="replyJob"
command="mput"
auto-create-directory="true"
expression="payload"
remote-directory-expression="payload.remoteDirectory + '/' + payload.filename + '.out'"
reply-channel="output"/>
这是 运行 批处理作业和使用结果的标准方法吗?
MessageChannel
有 send
方法。为此,您可以使用 MessageBuilder
为您的 payload
.
Message<?>
因为它只是一个 <channel>
与任何其他 MessageChannel
消费没有区别。像这样定义消费者的标准方式:
<service-activator input-channel="jobLaunchReplyChannel"/>
然而,当此频道特定于来自 <batch-int:job-launching-gateway>
的结果(JobLaunchingGateway
class):
jobExecution = this.jobLaunchingMessageHandler.launch(jobLaunchRequest);
因此,jobExecution
被发送到 jobLaunchReplyChannel
。
请参阅 Spring Batch Integration 文档。
也许您需要 AsyncItemWriter
或 ChunkMessageChannelItemWriter
?