已处理多条消息
Multiple message processed
我有一个 spring xd 源模块,它从 s3 中提取文件并按 line.I 拆分行 spring 配置为 below.But 我有 3 个容器和 1 admin server.Now 我看到每个容器正在处理重复的消息,因为每个容器都在下载自己的副本。
我可以通过将源 s3 模块部署计数为 1 来解决,但我对消息的处理速度越来越慢。有任何输入可以解决这个问题吗?
<int:poller fixed-delay="${fixedDelay}" default="true">
<int:advice-chain>
<ref bean="pollAdvise"/>
</int:advice-chain>
</int:poller>
<bean id="pollAdvise"
</bean>
<bean id="credentials" class="org.springframework.integration.aws.core.BasicAWSCredentials">
<property name="accessKey" value="#{encryptedDatum.decryptBase64Encoded('${accessKey}')}"/>
<property name="secretKey" value="${secretKey}"/>
</bean>
<bean id="clientConfiguration" class="com.amazonaws.ClientConfiguration">
<property name="proxyHost" value="${proxyHost}"/>
<property name="proxyPort" value="${proxyPort}"/>
<property name="preemptiveBasicProxyAuth" value="false"/>
</bean>
<bean id="s3Operations" class="org.springframework.integration.aws.s3.core.CustomC1AmazonS3Operations">
<constructor-arg index="0" ref="credentials"/>
<constructor-arg index="1" ref="clientConfiguration"/>
<property name="awsEndpoint" value="s3.amazonaws.com"/>
<property name="temporaryDirectory" value="${temporaryDirectory}"/>
<property name="awsSecurityKey" value="${awsSecurityKey}"/>
</bean>
<bean id="encryptedDatum" class="abc"/>
<!-- aws-endpoint="https://s3.amazonaws.com" -->
<int-aws:s3-inbound-channel-adapter aws-endpoint="s3.amazonaws.com"
bucket="${bucket}"
s3-operations="s3Operations"
credentials-ref="credentials"
file-name-wildcard="${fileNameWildcard}"
remote-directory="${remoteDirectory}"
channel="splitChannel"
local-directory="${localDirectory}"
accept-sub-folders="false"
delete-source-files="true"
archive-bucket="${archiveBucket}"
archive-directory="${archiveDirectory}">
</int-aws:s3-inbound-channel-adapter>
<int-file:splitter input-channel="splitChannel" output-channel="output" markers="false" charset="UTF-8">
<int-file:request-handler-advice-chain>
<bean class="org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice">
<property name="onSuccessExpression" value="payload.delete()"/>
</bean>
</int-file:request-handler-advice-chain>
</int-file:splitter>
<int:channel id="output"/>
[更新]
我按照你的建议用元数据 store.But 添加了幂等性,因为我的 xd 在 3 个容器集群中是 运行 和 rabbit 简单的元数据存储工作吗?我想我应该使用 reds/mongo 元数据 source.If 我使用 mongo/redis 元数据存储 我怎么能 evict/remove 消息因为消息会随着时间的推移堆积起来?
<int:idempotent-receiver id="expressionInterceptor" endpoint="output"
metadata-store="store"
discard-channel="nullChannel"
throw-exception-on-rejection="false"
key-expression="payload"/>
<bean id="store" class="org.springframework.integration.metadata.SimpleMetadataStore"/>
我建议你看看 Idempotent Receiver
。
这样您就可以使用共享 MetadataStore
并且不接受重复的文件。
应该为您的 <int-file:splitter>
配置 <idempotent-receiver>
。是的:使用丢弃逻辑来避免重复消息。
更新
.But since my xd is running in 3 container cluster with rabbit will simple metadatastore work?
这没关系,因为您从 S3 MessageSource
开始流式传输,所以您应该过滤已经存在的文件。因此你需要外部共享 MetadataStore
.
.If I use mongo/redis metadatastore howcan i evict/remove the messages because messages will pile up over time?
没错。它是幂等接收器逻辑的副作用。如果您使用数据库,不确定这对您来说是个问题...
您可以通过一些定期任务来清理 collection/keys。也许每周一次...
我有一个 spring xd 源模块,它从 s3 中提取文件并按 line.I 拆分行 spring 配置为 below.But 我有 3 个容器和 1 admin server.Now 我看到每个容器正在处理重复的消息,因为每个容器都在下载自己的副本。 我可以通过将源 s3 模块部署计数为 1 来解决,但我对消息的处理速度越来越慢。有任何输入可以解决这个问题吗?
<int:poller fixed-delay="${fixedDelay}" default="true">
<int:advice-chain>
<ref bean="pollAdvise"/>
</int:advice-chain>
</int:poller>
<bean id="pollAdvise"
</bean>
<bean id="credentials" class="org.springframework.integration.aws.core.BasicAWSCredentials">
<property name="accessKey" value="#{encryptedDatum.decryptBase64Encoded('${accessKey}')}"/>
<property name="secretKey" value="${secretKey}"/>
</bean>
<bean id="clientConfiguration" class="com.amazonaws.ClientConfiguration">
<property name="proxyHost" value="${proxyHost}"/>
<property name="proxyPort" value="${proxyPort}"/>
<property name="preemptiveBasicProxyAuth" value="false"/>
</bean>
<bean id="s3Operations" class="org.springframework.integration.aws.s3.core.CustomC1AmazonS3Operations">
<constructor-arg index="0" ref="credentials"/>
<constructor-arg index="1" ref="clientConfiguration"/>
<property name="awsEndpoint" value="s3.amazonaws.com"/>
<property name="temporaryDirectory" value="${temporaryDirectory}"/>
<property name="awsSecurityKey" value="${awsSecurityKey}"/>
</bean>
<bean id="encryptedDatum" class="abc"/>
<!-- aws-endpoint="https://s3.amazonaws.com" -->
<int-aws:s3-inbound-channel-adapter aws-endpoint="s3.amazonaws.com"
bucket="${bucket}"
s3-operations="s3Operations"
credentials-ref="credentials"
file-name-wildcard="${fileNameWildcard}"
remote-directory="${remoteDirectory}"
channel="splitChannel"
local-directory="${localDirectory}"
accept-sub-folders="false"
delete-source-files="true"
archive-bucket="${archiveBucket}"
archive-directory="${archiveDirectory}">
</int-aws:s3-inbound-channel-adapter>
<int-file:splitter input-channel="splitChannel" output-channel="output" markers="false" charset="UTF-8">
<int-file:request-handler-advice-chain>
<bean class="org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice">
<property name="onSuccessExpression" value="payload.delete()"/>
</bean>
</int-file:request-handler-advice-chain>
</int-file:splitter>
<int:channel id="output"/>
[更新] 我按照你的建议用元数据 store.But 添加了幂等性,因为我的 xd 在 3 个容器集群中是 运行 和 rabbit 简单的元数据存储工作吗?我想我应该使用 reds/mongo 元数据 source.If 我使用 mongo/redis 元数据存储 我怎么能 evict/remove 消息因为消息会随着时间的推移堆积起来?
<int:idempotent-receiver id="expressionInterceptor" endpoint="output"
metadata-store="store"
discard-channel="nullChannel"
throw-exception-on-rejection="false"
key-expression="payload"/>
<bean id="store" class="org.springframework.integration.metadata.SimpleMetadataStore"/>
我建议你看看 Idempotent Receiver
。
这样您就可以使用共享 MetadataStore
并且不接受重复的文件。
应该为您的 <int-file:splitter>
配置 <idempotent-receiver>
。是的:使用丢弃逻辑来避免重复消息。
更新
.But since my xd is running in 3 container cluster with rabbit will simple metadatastore work?
这没关系,因为您从 S3 MessageSource
开始流式传输,所以您应该过滤已经存在的文件。因此你需要外部共享 MetadataStore
.
.If I use mongo/redis metadatastore howcan i evict/remove the messages because messages will pile up over time?
没错。它是幂等接收器逻辑的副作用。如果您使用数据库,不确定这对您来说是个问题...
您可以通过一些定期任务来清理 collection/keys。也许每周一次...