从 s3 轮询文件时处理的重复消息
duplicate message processed when polling files from s3
我正在使用 s3 模块从 s3.It 轮询文件将文件下载到本地系统并开始处理 it.I am 运行 这个在 3 节点集群上,模块计数为 1.Now 假设文件从 s3 下载到本地系统并且 xd 正在处理 it.If xd 节点关闭它会处理一半 message.When 服务器启动它将再次开始处理文件因此我会重复 message.I 我正在尝试使用消息存储更改为幂等模式以将模块计数更改为 3,但仍然存在重复消息问题。
<int:poller fixed-delay="${fixedDelay}" default="true">
<int:advice-chain>
<ref bean="pollAdvise"/>
</int:advice-chain>
</int:poller>
<bean id="pollAdvise" class="org.springframework.integration.scheduling.PollSkipAdvice">
<constructor-arg ref="healthCheckStrategy"/>
</bean>
<bean id="healthCheckStrategy" class="ServiceHealthCheckPollSkipStrategy">
<property name="url" value="${url}"/>
<property name="doHealthCheck" value="${doHealthCheck}"/>
</bean>
<bean id="credentials" class="org.springframework.integration.aws.core.BasicAWSCredentials">
<property name="accessKey" value="${accessKey}"/>
<property name="secretKey" value="${secretKey}"/>
</bean>
<bean id="clientConfiguration" class="com.amazonaws.ClientConfiguration">
<property name="proxyHost" value="${proxyHost}"/>
<property name="proxyPort" value="${proxyPort}"/>
<property name="preemptiveBasicProxyAuth" value="false"/>
</bean>
<bean id="s3Operations" class="org.springframework.integration.aws.s3.core.CustomC1AmazonS3Operations">
<constructor-arg index="0" ref="credentials"/>
<constructor-arg index="1" ref="clientConfiguration"/>
<property name="awsEndpoint" value="s3.amazonaws.com"/>
<property name="temporaryDirectory" value="${temporaryDirectory}"/>
<property name="awsSecurityKey" value=""/>
</bean>
<!-- aws-endpoint="https://s3.amazonaws.com" -->
<int-aws:s3-inbound-channel-adapter aws-endpoint="s3.amazonaws.com"
bucket="${bucket}"
s3-operations="s3Operations"
credentials-ref="credentials"
file-name-wildcard="${fileNameWildcard}"
remote-directory="${remoteDirectory}"
channel="splitChannel"
local-directory="${localDirectory}"
accept-sub-folders="false"
delete-source-files="true"
archive-bucket="${archiveBucket}"
archive-directory="${archiveDirectory}">
</int-aws:s3-inbound-channel-adapter>
<int-file:splitter input-channel="splitChannel" output-channel="output" markers="false" charset="UTF-8">
<int-file:request-handler-advice-chain>
<bean class="org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice">
<property name="onSuccessExpression" value="payload.delete()"/>
</bean>
</int-file:request-handler-advice-chain>
</int-file:splitter>
<int:idempotent-receiver id="expressionInterceptor" endpoint="output"
metadata-store="redisMessageStore"
discard-channel="nullChannel"
throw-exception-on-rejection="false"
key-expression="payload"/>
<bean id="redisMessageStore" class="o.s.i.redis.store.RedisChannelMessageStore">
<constructor-arg ref="redisConnectionFactory"/>
</bean>
<bean id="redisConnectionFactory"
class="o.s.data.redis.connection.jedis.JedisConnectionFactory">
<property name="port" value="7379" />
</bean>
<int:channel id="output"/>
更新2
此配置对我有用感谢您的帮助。
<int:idempotent-receiver id="s3Interceptor" endpoint="s3splitter"
metadata-store="redisMessageStore"
discard-channel="nullChannel"
throw-exception-on-rejection="false"
key-expression="payload.name"/>
<bean id="redisMessageStore" class="org.springframework.integration.redis.metadata.RedisMetadataStore">
<constructor-arg ref="redisConnectionFactory"/>
</bean>
<bean id="redisConnectionFactory"
class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory">
<property name="port" value="6379" />
</bean>
<int:bridge id="batchBridge" input-channel="bridge" output-channel="output">
</int:bridge>
<int:idempotent-receiver id="splitterInterceptor" endpoint="batchBridge"
metadata-store="redisMessageStore"
discard-channel="nullChannel"
throw-exception-on-rejection="false"
key-expression="payload"/>
<int:channel id="output"/>
我几乎没有怀疑想澄清我是否做对了。
1) 如您所见,我有 ExpressionEvaluatingRequestHandlerAdvice 删除 file.Will 在我将文件读入 Redis 或读取最后一条记录后文件被删除?
2)我使用桌面管理器探索了 redis 我看到了这个 我有一个 MetaData as man Key
(文件和有效负载)元数据存储键和值将相同table这样可以吗?还是应该使用不同的元数据存储?
我可以使用有效载荷的哈希值而不是有效载荷作为密钥吗?有没有像 payload.hash!
看起来它是 的延续,但不幸的是,我们在您的案例中没有看到 <idempotent-receiver>
配置。
根据您的评论,您似乎继续经常使用 SimpleMetadataStore
或清理共享的 (Redis/Mongo)。
你应该分享更多信息去哪里挖掘。一些日志和 DEBUG 调查也很好。
更新
幂等接收器正好适用于 endpoint
。在您的配置中,它用于 MessageChannel
。这就是为什么你没有完成任何适当的工作,因为 MessageChannel
只是从 IdempotentReceiverInterceptor
.
中被忽略了
您应该为 <int-file:splitter>
添加一个 id
并使用 endpoint
属性中的 id
。如果将 File
作为 key
用于幂等性是个好主意,则不应该。 name
听起来更好。
更新 2
If a node goes down and lets assume a file is dowloaded(file size with million records may be gb) to xd node and I would have processed half the records and node crashes .When server comes up I think we will process same records again?
好的。我终于明白你的意思了!您已经对文件中的拆分行有疑问。
此外,我还会为 <splitter>
使用幂等接收器,以避免来自 S3 的重复文件。
要修复您的用例,您应该在 <splitter>
和 output
通道之间放置一个端点 - <bridge>
以使用幂等接收器跳过重复行。
我正在使用 s3 模块从 s3.It 轮询文件将文件下载到本地系统并开始处理 it.I am 运行 这个在 3 节点集群上,模块计数为 1.Now 假设文件从 s3 下载到本地系统并且 xd 正在处理 it.If xd 节点关闭它会处理一半 message.When 服务器启动它将再次开始处理文件因此我会重复 message.I 我正在尝试使用消息存储更改为幂等模式以将模块计数更改为 3,但仍然存在重复消息问题。
<int:poller fixed-delay="${fixedDelay}" default="true">
<int:advice-chain>
<ref bean="pollAdvise"/>
</int:advice-chain>
</int:poller>
<bean id="pollAdvise" class="org.springframework.integration.scheduling.PollSkipAdvice">
<constructor-arg ref="healthCheckStrategy"/>
</bean>
<bean id="healthCheckStrategy" class="ServiceHealthCheckPollSkipStrategy">
<property name="url" value="${url}"/>
<property name="doHealthCheck" value="${doHealthCheck}"/>
</bean>
<bean id="credentials" class="org.springframework.integration.aws.core.BasicAWSCredentials">
<property name="accessKey" value="${accessKey}"/>
<property name="secretKey" value="${secretKey}"/>
</bean>
<bean id="clientConfiguration" class="com.amazonaws.ClientConfiguration">
<property name="proxyHost" value="${proxyHost}"/>
<property name="proxyPort" value="${proxyPort}"/>
<property name="preemptiveBasicProxyAuth" value="false"/>
</bean>
<bean id="s3Operations" class="org.springframework.integration.aws.s3.core.CustomC1AmazonS3Operations">
<constructor-arg index="0" ref="credentials"/>
<constructor-arg index="1" ref="clientConfiguration"/>
<property name="awsEndpoint" value="s3.amazonaws.com"/>
<property name="temporaryDirectory" value="${temporaryDirectory}"/>
<property name="awsSecurityKey" value=""/>
</bean>
<!-- aws-endpoint="https://s3.amazonaws.com" -->
<int-aws:s3-inbound-channel-adapter aws-endpoint="s3.amazonaws.com"
bucket="${bucket}"
s3-operations="s3Operations"
credentials-ref="credentials"
file-name-wildcard="${fileNameWildcard}"
remote-directory="${remoteDirectory}"
channel="splitChannel"
local-directory="${localDirectory}"
accept-sub-folders="false"
delete-source-files="true"
archive-bucket="${archiveBucket}"
archive-directory="${archiveDirectory}">
</int-aws:s3-inbound-channel-adapter>
<int-file:splitter input-channel="splitChannel" output-channel="output" markers="false" charset="UTF-8">
<int-file:request-handler-advice-chain>
<bean class="org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice">
<property name="onSuccessExpression" value="payload.delete()"/>
</bean>
</int-file:request-handler-advice-chain>
</int-file:splitter>
<int:idempotent-receiver id="expressionInterceptor" endpoint="output"
metadata-store="redisMessageStore"
discard-channel="nullChannel"
throw-exception-on-rejection="false"
key-expression="payload"/>
<bean id="redisMessageStore" class="o.s.i.redis.store.RedisChannelMessageStore">
<constructor-arg ref="redisConnectionFactory"/>
</bean>
<bean id="redisConnectionFactory"
class="o.s.data.redis.connection.jedis.JedisConnectionFactory">
<property name="port" value="7379" />
</bean>
<int:channel id="output"/>
更新2 此配置对我有用感谢您的帮助。
<int:idempotent-receiver id="s3Interceptor" endpoint="s3splitter"
metadata-store="redisMessageStore"
discard-channel="nullChannel"
throw-exception-on-rejection="false"
key-expression="payload.name"/>
<bean id="redisMessageStore" class="org.springframework.integration.redis.metadata.RedisMetadataStore">
<constructor-arg ref="redisConnectionFactory"/>
</bean>
<bean id="redisConnectionFactory"
class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory">
<property name="port" value="6379" />
</bean>
<int:bridge id="batchBridge" input-channel="bridge" output-channel="output">
</int:bridge>
<int:idempotent-receiver id="splitterInterceptor" endpoint="batchBridge"
metadata-store="redisMessageStore"
discard-channel="nullChannel"
throw-exception-on-rejection="false"
key-expression="payload"/>
<int:channel id="output"/>
我几乎没有怀疑想澄清我是否做对了。
1) 如您所见,我有 ExpressionEvaluatingRequestHandlerAdvice 删除 file.Will 在我将文件读入 Redis 或读取最后一条记录后文件被删除?
2)我使用桌面管理器探索了 redis 我看到了这个 我有一个 MetaData as man Key
(文件和有效负载)元数据存储键和值将相同table这样可以吗?还是应该使用不同的元数据存储?
我可以使用有效载荷的哈希值而不是有效载荷作为密钥吗?有没有像 payload.hash!
看起来它是 <idempotent-receiver>
配置。
根据您的评论,您似乎继续经常使用 SimpleMetadataStore
或清理共享的 (Redis/Mongo)。
你应该分享更多信息去哪里挖掘。一些日志和 DEBUG 调查也很好。
更新
幂等接收器正好适用于 endpoint
。在您的配置中,它用于 MessageChannel
。这就是为什么你没有完成任何适当的工作,因为 MessageChannel
只是从 IdempotentReceiverInterceptor
.
您应该为 <int-file:splitter>
添加一个 id
并使用 endpoint
属性中的 id
。如果将 File
作为 key
用于幂等性是个好主意,则不应该。 name
听起来更好。
更新 2
If a node goes down and lets assume a file is dowloaded(file size with million records may be gb) to xd node and I would have processed half the records and node crashes .When server comes up I think we will process same records again?
好的。我终于明白你的意思了!您已经对文件中的拆分行有疑问。
此外,我还会为 <splitter>
使用幂等接收器,以避免来自 S3 的重复文件。
要修复您的用例,您应该在 <splitter>
和 output
通道之间放置一个端点 - <bridge>
以使用幂等接收器跳过重复行。