Spring 集成 SFTP 通道适配器未将消息发送到下游服务激活器以进行第二次轮询
Spring Integration SFTP channel adapter not sending the message to the down stream service activator for the second polling
我正在尝试实现简单的 SFTP 通道适配器以接受具有任何文件名的文件(以允许重复的文件名)。对于第一次轮询,文件从 SFTP 服务器目录传输到本地目录,并且订阅该频道的服务激活器能够接收消息,但是第二次,如果我在 SFTP 服务器中保留同名文件,则文件已传输,但服务激活器无法获取消息。我试图实现本地过滤器,但它进入了无限循环(轮询文件并创建消息)。
配置
<int:channel id="inboundMGetRecursive">
<int:queue/>
</int:channel>
<int-sftp:inbound-channel-adapter id="sftpInboundAdapter"
auto-startup="true"
channel="channel1"
session-factory="sftpSessionFactory"
local-directory="c:/tmp/"
remote-directory="${sftp.file.remote.inbound.dir}"
auto-create-local-directory="false"
filename-pattern="*.txt"
local-filter="acceptAll">
<int:poller fixed-delay="60000" error-channel="sftpExceptionsChannel"
max-messages-per-poll="-1"/>
<bean id="acceptAll" class="org.springframework.integration.file.filters.AcceptAllFileListFilter"/
<int:service-activator id="serviceActivator" input-channel="channel1" ref="fileMoverHandler"
method="method1">
<int:poller fixed-rate="3000" max-messages-per-poll="-1"/>
</int:service-activator>
无限循环日志
2016-12-14 13:25:39,632 [task-scheduler-1] [handlers.FileFilter] INFO - Filter Activated
2016-12-14 13:25:39,635 [task-scheduler-1] [handlers.FileFilter] INFO - file name : config.txt
2016-12-14 13:25:39,636 [task-scheduler-1] [file.FileReadingMessageSource] DEBUG - Added to queue: [C:\tmp\csvfiles\staging\config.txt]
2016-12-14 13:25:39,636 [task-scheduler-1] [file.FileReadingMessageSource] INFO - Created message: [GenericMessage [payload=C:\tmp\csvfiles\staging\config.txt, headers={id=4a062556-4181-2806-d2ee-c0aa3221aec3, timestamp=1481743539636}]]
2016-12-14 13:25:39,636 [task-scheduler-1] [endpoint.SourcePollingChannelAdapter] DEBUG - Poll resulted in Message: GenericMessage [payload=C:\tmp\csvfiles\staging\config.txt, headers={id=4a062556-4181-2806-d2ee-c0aa3221aec3, timestamp=1481743539636}]
2016-12-14 13:25:39,636 [task-scheduler-1] [channel.QueueChannel] DEBUG - preSend on channel 'inboundMGetRecursive', message: GenericMessage [payload=C:\tmp\csvfiles\staging\config.txt, headers={id=4a062556-4181-2806-d2ee-c0aa3221aec3, timestamp=1481743539636}]
2016-12-14 13:25:39,636 [task-scheduler-1] [channel.QueueChannel] DEBUG - postSend (sent=true) on channel 'inboundMGetRecursive', message: GenericMessage [payload=C:\tmp\csvfiles\staging\config.txt, headers={id=4a062556-4181-2806-d2ee-c0aa3221aec3, timestamp=1481743539636}]
2016-12-14 13:25:39,637 [task-scheduler-1] [handlers.FileFilter] INFO - Filter Activated
2016-12-14 13:25:39,637 [task-scheduler-1] [handlers.FileFilter] INFO - file name : config.txt
2016-12-14 13:25:39,637 [task-scheduler-1] [file.FileReadingMessageSource] DEBUG - Added to queue: [C:\tmp\csvfiles\staging\config.txt]
2016-12-14 13:25:39,638 [task-scheduler-1] [file.FileReadingMessageSource] INFO - Created message: [GenericMessage [payload=C:\tmp\csvfiles\staging\config.txt, headers={id=a2df4814-f759-0440-8ce2-c8d969a9315d, timestamp=1481743539638}]]
2016-12-14 13:25:39,638 [task-scheduler-1] [endpoint.SourcePollingChannelAdapter] DEBUG - Poll resulted in Message: GenericMessage [payload=C:\tmp\csvfiles\staging\config.txt, headers={id=a2df4814-f759-0440-8ce2-c8d969a9315d, timestamp=1481743539638}]
2016-12-14 13:25:39,638 [task-scheduler-1] [channel.QueueChannel] DEBUG - preSend on channel 'inboundMGetRecursive', message: GenericMessage [payload=C:\tmp\csvfiles\staging\config.txt, headers={id=a2df4814-f759-0440-8ce2-c8d969a9315d, timestamp=1481743539638}]
2016-12-14 13:25:39,639 [task-scheduler-1] [channel.QueueChannel] DEBUG - postSend (sent=true) on channel 'inboundMGetRecursive', message: GenericMessage [payload=C:\tmp\csvfiles\staging\config.txt, headers={id=a2df4814-f759-0440-8ce2-c8d969a9315d, timestamp=1481743539638}]
2016-12-14 13:25:39,639 [task-scheduler-1] [handlers.FileFilter] INFO - Filter Activated
2016-12-14 13:25:39,639 [task-scheduler-1] [handlers.FileFilter] INFO - file name : config.txt
2016-12-14 13:25:39,640 [task-scheduler-1] [file.FileReadingMessageSource] DEBUG - Added to queue: [C:\tmp\csvfiles\staging\config.txt]
2016-12-14 13:25:39,640 [task-scheduler-1] [file.FileReadingMessageSource] INFO - Created message: [GenericMessage [payload=C:\tmp\csvfiles\staging\config.txt, headers={id=fd1ace38-6550-82ea-89c5-2a6229463167, timestamp=1481743539640}]]
2016-12-14 13:25:39,640 [task-scheduler-1] [endpoint.SourcePollingChannelAdapter] DEBUG - Poll resulted in Message: GenericMessage [payload=C:\tmp\csvfiles\staging\config.txt, headers={id=fd1ace38-6550-82ea-89c5-2a6229463167, timestamp=1481743539640}]
2016-12-14 13:25:39,640 [task-scheduler-1] [channel.QueueChannel] DEBUG - preSend on channel 'inboundMGetRecursive', message: GenericMessage [payload=C:\tmp\csvfiles\staging\config.txt, headers={id=fd1ace38-6550-82ea-89c5-2a6229463167, timestamp=1481743539640}]
2016-12-14 13:25:39,640 [task-scheduler-1] [channel.QueueChannel] DEBUG - postSend (sent=true) on channel 'inboundMGetRecursive', message: GenericMessage [payload=C:\tmp\csvfiles\staging\config.txt, headers={id=fd1ace38-6550-82ea-89c5-2a6229463167, timestamp=1481743539640}]
请帮助我如何使用过滤器接受所有文件。
使用接受所有本地过滤器时,您需要remove/rename处理完成后从本地磁盘获取文件,然后再进行下一次轮询。
除非您也删除了远程文件,否则您还应该使用 FtpPersistentAcceptOnceFileListFilter
int 远程过滤器,这样在修改后的时间戳发生变化之前不会重新获取文件。
我正在尝试实现简单的 SFTP 通道适配器以接受具有任何文件名的文件(以允许重复的文件名)。对于第一次轮询,文件从 SFTP 服务器目录传输到本地目录,并且订阅该频道的服务激活器能够接收消息,但是第二次,如果我在 SFTP 服务器中保留同名文件,则文件已传输,但服务激活器无法获取消息。我试图实现本地过滤器,但它进入了无限循环(轮询文件并创建消息)。
配置
<int:channel id="inboundMGetRecursive">
<int:queue/>
</int:channel>
<int-sftp:inbound-channel-adapter id="sftpInboundAdapter"
auto-startup="true"
channel="channel1"
session-factory="sftpSessionFactory"
local-directory="c:/tmp/"
remote-directory="${sftp.file.remote.inbound.dir}"
auto-create-local-directory="false"
filename-pattern="*.txt"
local-filter="acceptAll">
<int:poller fixed-delay="60000" error-channel="sftpExceptionsChannel"
max-messages-per-poll="-1"/>
<bean id="acceptAll" class="org.springframework.integration.file.filters.AcceptAllFileListFilter"/
<int:service-activator id="serviceActivator" input-channel="channel1" ref="fileMoverHandler"
method="method1">
<int:poller fixed-rate="3000" max-messages-per-poll="-1"/>
</int:service-activator>
无限循环日志
2016-12-14 13:25:39,632 [task-scheduler-1] [handlers.FileFilter] INFO - Filter Activated
2016-12-14 13:25:39,635 [task-scheduler-1] [handlers.FileFilter] INFO - file name : config.txt
2016-12-14 13:25:39,636 [task-scheduler-1] [file.FileReadingMessageSource] DEBUG - Added to queue: [C:\tmp\csvfiles\staging\config.txt]
2016-12-14 13:25:39,636 [task-scheduler-1] [file.FileReadingMessageSource] INFO - Created message: [GenericMessage [payload=C:\tmp\csvfiles\staging\config.txt, headers={id=4a062556-4181-2806-d2ee-c0aa3221aec3, timestamp=1481743539636}]]
2016-12-14 13:25:39,636 [task-scheduler-1] [endpoint.SourcePollingChannelAdapter] DEBUG - Poll resulted in Message: GenericMessage [payload=C:\tmp\csvfiles\staging\config.txt, headers={id=4a062556-4181-2806-d2ee-c0aa3221aec3, timestamp=1481743539636}]
2016-12-14 13:25:39,636 [task-scheduler-1] [channel.QueueChannel] DEBUG - preSend on channel 'inboundMGetRecursive', message: GenericMessage [payload=C:\tmp\csvfiles\staging\config.txt, headers={id=4a062556-4181-2806-d2ee-c0aa3221aec3, timestamp=1481743539636}]
2016-12-14 13:25:39,636 [task-scheduler-1] [channel.QueueChannel] DEBUG - postSend (sent=true) on channel 'inboundMGetRecursive', message: GenericMessage [payload=C:\tmp\csvfiles\staging\config.txt, headers={id=4a062556-4181-2806-d2ee-c0aa3221aec3, timestamp=1481743539636}]
2016-12-14 13:25:39,637 [task-scheduler-1] [handlers.FileFilter] INFO - Filter Activated
2016-12-14 13:25:39,637 [task-scheduler-1] [handlers.FileFilter] INFO - file name : config.txt
2016-12-14 13:25:39,637 [task-scheduler-1] [file.FileReadingMessageSource] DEBUG - Added to queue: [C:\tmp\csvfiles\staging\config.txt]
2016-12-14 13:25:39,638 [task-scheduler-1] [file.FileReadingMessageSource] INFO - Created message: [GenericMessage [payload=C:\tmp\csvfiles\staging\config.txt, headers={id=a2df4814-f759-0440-8ce2-c8d969a9315d, timestamp=1481743539638}]]
2016-12-14 13:25:39,638 [task-scheduler-1] [endpoint.SourcePollingChannelAdapter] DEBUG - Poll resulted in Message: GenericMessage [payload=C:\tmp\csvfiles\staging\config.txt, headers={id=a2df4814-f759-0440-8ce2-c8d969a9315d, timestamp=1481743539638}]
2016-12-14 13:25:39,638 [task-scheduler-1] [channel.QueueChannel] DEBUG - preSend on channel 'inboundMGetRecursive', message: GenericMessage [payload=C:\tmp\csvfiles\staging\config.txt, headers={id=a2df4814-f759-0440-8ce2-c8d969a9315d, timestamp=1481743539638}]
2016-12-14 13:25:39,639 [task-scheduler-1] [channel.QueueChannel] DEBUG - postSend (sent=true) on channel 'inboundMGetRecursive', message: GenericMessage [payload=C:\tmp\csvfiles\staging\config.txt, headers={id=a2df4814-f759-0440-8ce2-c8d969a9315d, timestamp=1481743539638}]
2016-12-14 13:25:39,639 [task-scheduler-1] [handlers.FileFilter] INFO - Filter Activated
2016-12-14 13:25:39,639 [task-scheduler-1] [handlers.FileFilter] INFO - file name : config.txt
2016-12-14 13:25:39,640 [task-scheduler-1] [file.FileReadingMessageSource] DEBUG - Added to queue: [C:\tmp\csvfiles\staging\config.txt]
2016-12-14 13:25:39,640 [task-scheduler-1] [file.FileReadingMessageSource] INFO - Created message: [GenericMessage [payload=C:\tmp\csvfiles\staging\config.txt, headers={id=fd1ace38-6550-82ea-89c5-2a6229463167, timestamp=1481743539640}]]
2016-12-14 13:25:39,640 [task-scheduler-1] [endpoint.SourcePollingChannelAdapter] DEBUG - Poll resulted in Message: GenericMessage [payload=C:\tmp\csvfiles\staging\config.txt, headers={id=fd1ace38-6550-82ea-89c5-2a6229463167, timestamp=1481743539640}]
2016-12-14 13:25:39,640 [task-scheduler-1] [channel.QueueChannel] DEBUG - preSend on channel 'inboundMGetRecursive', message: GenericMessage [payload=C:\tmp\csvfiles\staging\config.txt, headers={id=fd1ace38-6550-82ea-89c5-2a6229463167, timestamp=1481743539640}]
2016-12-14 13:25:39,640 [task-scheduler-1] [channel.QueueChannel] DEBUG - postSend (sent=true) on channel 'inboundMGetRecursive', message: GenericMessage [payload=C:\tmp\csvfiles\staging\config.txt, headers={id=fd1ace38-6550-82ea-89c5-2a6229463167, timestamp=1481743539640}]
请帮助我如何使用过滤器接受所有文件。
使用接受所有本地过滤器时,您需要remove/rename处理完成后从本地磁盘获取文件,然后再进行下一次轮询。
除非您也删除了远程文件,否则您还应该使用 FtpPersistentAcceptOnceFileListFilter
int 远程过滤器,这样在修改后的时间戳发生变化之前不会重新获取文件。