在 xd 中处理文件流之前的健康检查
health check before processing file stream in xd
我正在从 s3 中提取文件并使用 spring xd 处理它们。我有一个处理器 http 客户端组件,我在其中执行一些 RESTful 请求。现在这种方法的问题是,如果我的 web 服务关闭,文件会在 rabbit mq transport 中累积。因此,在从 s3 中提取单个文件之前,我想对我的休息进行健康检查 service.How 我可以解决这个问题吗?我的配置文件看起来像这样。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-aws="http://www.springframework.org/schema/integration/aws"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/aws http://www.springframework.org/schema/integration/aws/spring-integration-aws-1.0.xsd">
<int:poller fixed-delay="${fixed-delay}" default="true"/>
<bean id="credentials" class="org.springframework.integration.aws.core.BasicAWSCredentials">
<property name="accessKey" value="${accessKey}"/>
<property name="secretKey" value="${secretKey}"/>
</bean>
<bean
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="location">
<value>dms-aws-s3-nonprod.properties</value>
</property>
</bean>
<bean id="clientConfiguration" class="com.amazonaws.ClientConfiguration">
<property name="proxyHost" value="${proxyHost}"/>
<property name="proxyPort" value="${proxyPort}"/>
<property name="preemptiveBasicProxyAuth" value="false"/>
</bean>
<bean id="s3Operations" class="org.springframework.integration.aws.s3.core.CustomC1AmazonS3Operations">
<constructor-arg index="0" ref="credentials"/>
<constructor-arg index="1" ref="clientConfiguration"/>
<property name="awsEndpoint" value="s3.amazonaws.com"/>
<property name="temporaryDirectory" value="${temporaryDirectory}"/>
<property name="awsSecurityKey" value="${awsSecurityKey}"/>
</bean>
<!-- aws-endpoint="https://s3.amazonaws.com" -->
<int-aws:s3-inbound-channel-adapter aws-endpoint="s3.amazonaws.com"
bucket="${bucket}"
s3-operations="s3Operations"
credentials-ref="credentials"
file-name-wildcard="${file-name-wildcard}"
remote-directory="${remote-directory}"
channel="splitChannel"
local-directory="${local-directory}"
accept-sub-folders="false"
delete-source-files="true"
archive-bucket="${archive-bucket}"
archive-directory="${archive-directory}">
</int-aws:s3-inbound-channel-adapter>
int-file:splitter input-channel="splitChannel" output-channel="output" markers="true"/>
<int:channel id="output"/>
我的流定义
xd-shell>stream create feedTest16 --definition "aws-s3-source |processor-http-client| log" --deploy
从 Spring Integration 4.1 开始,PollSkipAdvice
已 introduced。
实现您自己的 ServiceHealthCheckPollSkipStrategy
并将其注入 <poller>
的 <advice-chain>
以供您的 <int-aws:s3-inbound-channel-adapter>
使用,您完全符合要求!
只有一个问题是您的 s3-source
与 http-client
...
的目标服务相关联
我正在从 s3 中提取文件并使用 spring xd 处理它们。我有一个处理器 http 客户端组件,我在其中执行一些 RESTful 请求。现在这种方法的问题是,如果我的 web 服务关闭,文件会在 rabbit mq transport 中累积。因此,在从 s3 中提取单个文件之前,我想对我的休息进行健康检查 service.How 我可以解决这个问题吗?我的配置文件看起来像这样。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-aws="http://www.springframework.org/schema/integration/aws"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/aws http://www.springframework.org/schema/integration/aws/spring-integration-aws-1.0.xsd">
<int:poller fixed-delay="${fixed-delay}" default="true"/>
<bean id="credentials" class="org.springframework.integration.aws.core.BasicAWSCredentials">
<property name="accessKey" value="${accessKey}"/>
<property name="secretKey" value="${secretKey}"/>
</bean>
<bean
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="location">
<value>dms-aws-s3-nonprod.properties</value>
</property>
</bean>
<bean id="clientConfiguration" class="com.amazonaws.ClientConfiguration">
<property name="proxyHost" value="${proxyHost}"/>
<property name="proxyPort" value="${proxyPort}"/>
<property name="preemptiveBasicProxyAuth" value="false"/>
</bean>
<bean id="s3Operations" class="org.springframework.integration.aws.s3.core.CustomC1AmazonS3Operations">
<constructor-arg index="0" ref="credentials"/>
<constructor-arg index="1" ref="clientConfiguration"/>
<property name="awsEndpoint" value="s3.amazonaws.com"/>
<property name="temporaryDirectory" value="${temporaryDirectory}"/>
<property name="awsSecurityKey" value="${awsSecurityKey}"/>
</bean>
<!-- aws-endpoint="https://s3.amazonaws.com" -->
<int-aws:s3-inbound-channel-adapter aws-endpoint="s3.amazonaws.com"
bucket="${bucket}"
s3-operations="s3Operations"
credentials-ref="credentials"
file-name-wildcard="${file-name-wildcard}"
remote-directory="${remote-directory}"
channel="splitChannel"
local-directory="${local-directory}"
accept-sub-folders="false"
delete-source-files="true"
archive-bucket="${archive-bucket}"
archive-directory="${archive-directory}">
</int-aws:s3-inbound-channel-adapter>
int-file:splitter input-channel="splitChannel" output-channel="output" markers="true"/>
<int:channel id="output"/>
我的流定义
xd-shell>stream create feedTest16 --definition "aws-s3-source |processor-http-client| log" --deploy
从 Spring Integration 4.1 开始,PollSkipAdvice
已 introduced。
实现您自己的 ServiceHealthCheckPollSkipStrategy
并将其注入 <poller>
的 <advice-chain>
以供您的 <int-aws:s3-inbound-channel-adapter>
使用,您完全符合要求!
只有一个问题是您的 s3-source
与 http-client
...