spring 轮询器中的动态延迟配置
dynamic delay configuration in spring poller
动态轮询
我正在使用从集成到轮询文件的轮询器组件,从 s3.The 固定延迟为 15 分钟,最大消息速率为 1。我这样做的原因是 xd 中的下游消息被堵塞,因为我使用 http.Now 这是 100k 记录文件的文件但是当文件大小很小时我仍然等待 15 分钟虽然我可以处理 fast.Now 有什么方法可以根据 file.Because 的大小动态设置延迟我们不知道哪些文件将被轮询也知道吗?根据我将提取的文件大小或记录数,我们可以动态更改固定延迟或固定速率吗?
<int:poller fixed-delay="${fixedDelay}" default="true" max-messages-per-poll="${maxMessageRate}">
<int:advice-chain>
<ref bean="pollAdvise"/>
</int:advice-chain>
</int:poller>
<bean id="pollAdvise" class="org.springframework.integration.scheduling.PollSkipAdvice">
<constructor-arg ref="healthCheckStrategy"/>
</bean>
<bean id="healthCheckStrategy" class="test.ServiceHealthCheckPollSkipStrategy">
<property name="url" value="${url}"/>
<property name="doHealthCheck" value="${doHealthCheck}"/>
</bean>
<bean id="credentials" class="org.springframework.integration.aws.core.BasicAWSCredentials">
<property name="accessKey" value="${accessKey}"/>
<property name="secretKey" value="${secretKey}"/>
</bean>
<bean id="clientConfiguration" class="com.amazonaws.ClientConfiguration">
<property name="proxyHost" value="${proxyHost}"/>
<property name="proxyPort" value="${proxyPort}"/>
<property name="preemptiveBasicProxyAuth" value="false"/>
</bean>
<bean id="s3Operations" class="org.springframework.integration.aws.s3.core.CustomC1AmazonS3Operations">
<constructor-arg index="0" ref="credentials"/>
<constructor-arg index="1" ref="clientConfiguration"/>
<property name="awsEndpoint" value="s3.amazonaws.com"/>
<property name="temporaryDirectory" value="${temporaryDirectory}"/>
<property name="awsSecurityKey" value="${awsSecurityKey}"/>
</bean>
<!-- aws-endpoint="https://s3.amazonaws.com" -->
<int-aws:s3-inbound-channel-adapter aws-endpoint="s3.amazonaws.com"
bucket="${bucket}"
s3-operations="s3Operations"
credentials-ref="credentials"
file-name-wildcard="${fileNameWildcard}"
remote-directory="${remoteDirectory}"
channel="splitChannel"
local-directory="${localDirectory}"
accept-sub-folders="false"
delete-source-files="true"
archive-bucket="${archiveBucket}"
archive-directory="${archiveDirectory}">
</int-aws:s3-inbound-channel-adapter>
<int-file:splitter id="s3splitter" input-channel="splitChannel" output-channel="bridge" markers="false" charset="UTF-8">
<int-file:request-handler-advice-chain>
<bean class="org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice">
<property name="onSuccessExpression" value="payload.delete()"/>
</bean>
</int-file:request-handler-advice-chain>
</int-file:splitter>
从 Spring Integration 4.2 开始,AbstractMessageSourceAdvice
已 introduced:
This method is called after the receive() method; again, you can reconfigure the source, or take any action perhaps depending on the result (which can be null if there was no message created by the source). You can even return a different message!
从 4.3 版开始,我们引入 CompoundTriggerAdvice
:http://docs.spring.io/spring-integration/docs/4.3.0.BUILD-SNAPSHOT/reference/html/messaging-channels-section.html#_compoundtriggeradvice
您可以根据 payload
大小将其用于您的用例。
动态轮询
我正在使用从集成到轮询文件的轮询器组件,从 s3.The 固定延迟为 15 分钟,最大消息速率为 1。我这样做的原因是 xd 中的下游消息被堵塞,因为我使用 http.Now 这是 100k 记录文件的文件但是当文件大小很小时我仍然等待 15 分钟虽然我可以处理 fast.Now 有什么方法可以根据 file.Because 的大小动态设置延迟我们不知道哪些文件将被轮询也知道吗?根据我将提取的文件大小或记录数,我们可以动态更改固定延迟或固定速率吗?
<int:poller fixed-delay="${fixedDelay}" default="true" max-messages-per-poll="${maxMessageRate}">
<int:advice-chain>
<ref bean="pollAdvise"/>
</int:advice-chain>
</int:poller>
<bean id="pollAdvise" class="org.springframework.integration.scheduling.PollSkipAdvice">
<constructor-arg ref="healthCheckStrategy"/>
</bean>
<bean id="healthCheckStrategy" class="test.ServiceHealthCheckPollSkipStrategy">
<property name="url" value="${url}"/>
<property name="doHealthCheck" value="${doHealthCheck}"/>
</bean>
<bean id="credentials" class="org.springframework.integration.aws.core.BasicAWSCredentials">
<property name="accessKey" value="${accessKey}"/>
<property name="secretKey" value="${secretKey}"/>
</bean>
<bean id="clientConfiguration" class="com.amazonaws.ClientConfiguration">
<property name="proxyHost" value="${proxyHost}"/>
<property name="proxyPort" value="${proxyPort}"/>
<property name="preemptiveBasicProxyAuth" value="false"/>
</bean>
<bean id="s3Operations" class="org.springframework.integration.aws.s3.core.CustomC1AmazonS3Operations">
<constructor-arg index="0" ref="credentials"/>
<constructor-arg index="1" ref="clientConfiguration"/>
<property name="awsEndpoint" value="s3.amazonaws.com"/>
<property name="temporaryDirectory" value="${temporaryDirectory}"/>
<property name="awsSecurityKey" value="${awsSecurityKey}"/>
</bean>
<!-- aws-endpoint="https://s3.amazonaws.com" -->
<int-aws:s3-inbound-channel-adapter aws-endpoint="s3.amazonaws.com"
bucket="${bucket}"
s3-operations="s3Operations"
credentials-ref="credentials"
file-name-wildcard="${fileNameWildcard}"
remote-directory="${remoteDirectory}"
channel="splitChannel"
local-directory="${localDirectory}"
accept-sub-folders="false"
delete-source-files="true"
archive-bucket="${archiveBucket}"
archive-directory="${archiveDirectory}">
</int-aws:s3-inbound-channel-adapter>
<int-file:splitter id="s3splitter" input-channel="splitChannel" output-channel="bridge" markers="false" charset="UTF-8">
<int-file:request-handler-advice-chain>
<bean class="org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice">
<property name="onSuccessExpression" value="payload.delete()"/>
</bean>
</int-file:request-handler-advice-chain>
</int-file:splitter>
从 Spring Integration 4.2 开始,AbstractMessageSourceAdvice
已 introduced:
This method is called after the receive() method; again, you can reconfigure the source, or take any action perhaps depending on the result (which can be null if there was no message created by the source). You can even return a different message!
从 4.3 版开始,我们引入 CompoundTriggerAdvice
:http://docs.spring.io/spring-integration/docs/4.3.0.BUILD-SNAPSHOT/reference/html/messaging-channels-section.html#_compoundtriggeradvice
您可以根据 payload
大小将其用于您的用例。