文件在处理前被删除

File getting deleted before processing

我有 spring 带兔子传输的 xd 模块,它从 s3 中提取文件并逐行拆分并在处理后删除它(ExpressionAdvice)。我的文件中有大约 100 万条消息(行),这是在 s3.The 中,文件被下载到 xd 容器盒中,我检查了 md5sum 及其相同且具有相同的行。我看到只有 260k 奇数消息进入输出通道,processor.I 丢失了大约 740 条消息。有时它是随机的,一旦我在我的输出通道中看到所有消息,如 100 万,有时只有 250k。我正在使用计数器测量这个,因为我的 stream.File 已下载但我觉得它在处理所有 records/lines 之前被删除10 秒,我的文件大小约为 700Mb.Please 让我知道表达式建议是否在处理前被删除。

 module.aws-s3-source.count=1 and module.aws-s3-source.concurrency=70
    stream1 as-s3-source |processor|sink

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:int-aws="http://www.springframework.org/schema/integration/aws"
       xmlns:int-file="http://www.springframework.org/schema/integration/file"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/integration/file
         http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
             http://www.springframework.org/schema/context
             http://www.springframework.org/schema/context/spring-context.xsd
         http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
        http://www.springframework.org/schema/integration/aws http://www.springframework.org/schema/integration/aws/spring-integration-aws-1.0.xsd">


    <context:property-placeholder location="classpath*:test-${region}.properties" />

    <int:poller fixed-delay="${fixedDelay}" default="true">
        <int:advice-chain>
            <ref bean="pollAdvise"/>

        </int:advice-chain>
    </int:poller>


    <bean id="pollAdvise" class="org.springframework.integration.scheduling.PollSkipAdvice">
        <constructor-arg ref="healthCheckStrategy"/>

    </bean>



    <bean id="healthCheckStrategy" class="test.ServiceHealthCheckPollSkipStrategy">
        <property name="url" value="${url}"/>
        <property name="doHealthCheck" value="${doHealthCheck}"/>
        <property name="restTemplate" ref="restTemplate"/>

    </bean>

    <bean id="restTemplate"
          class="org.springframework.web.client.RestTemplate">
        <constructor-arg ref="requestFactory"/>

    </bean>


    <bean id="requestFactory"
          class="test.BatchClientHttpRequestFactory">
        <constructor-arg ref="verifier"/>

    </bean>

    <bean id="verifier"
          class="test.NullHostnameVerifier">

    </bean>


    <bean id="encryptedDatum" class="test.EncryptedSecuredDatum"/>




    <bean id="clientConfiguration" class="com.amazonaws.ClientConfiguration">
        <property name="proxyHost" value="${proxyHost}"/>
        <property name="proxyPort" value="${proxyPort}"/>
        <property name="preemptiveBasicProxyAuth" value="false"/>
    </bean>

    <bean id="s3Operations" class="test.CustomC1AmazonS3Operations">

        <constructor-arg index="0" ref="clientConfiguration"/>
        <property name="awsEndpoint" value="s3.amazonaws.com"/>
        <property name="temporaryDirectory" value="${temporaryDirectory}"/>
        <property name="awsSecurityKey"  value=""/>
    </bean>


    <bean id="credentials" class="org.springframework.integration.aws.core.BasicAWSCredentials">

    </bean>

    <int-aws:s3-inbound-channel-adapter aws-endpoint="s3.amazonaws.com"
                                        bucket="${bucket}"
                                        s3-operations="s3Operations"
                                        credentials-ref="credentials"
                                        file-name-wildcard="${fileNameWildcard}"
                                        remote-directory="${prefix}"
                                        channel="splitChannel"
                                        local-directory="${localDirectory}"
                                        accept-sub-folders="false"
                                        delete-source-files="true"
                                        archive-bucket="${archiveBucket}"
                                        archive-directory="${archiveDirectory}">
    </int-aws:s3-inbound-channel-adapter>

    <int-file:splitter input-channel="splitChannel" output-channel="output" markers="false" charset="UTF-8">

        <int-file:request-handler-advice-chain>
            <bean class="org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice">
                <property name="onSuccessExpression" value="payload.delete()"/>
            </bean>
        </int-file:request-handler-advice-chain>

    </int-file:splitter>

    <int:channel-interceptor pattern="*" order="3">
        <bean class="org.springframework.integration.channel.interceptor.WireTap">
            <constructor-arg ref="loggingChannel" />
        </bean>
    </int:channel-interceptor>
    <int:logging-channel-adapter id="loggingChannel" log-full-message="true" level="INFO"/>

    <int:channel id="output"/>

</beans>

更新 2:

我的直播如下 aws-s3-source|处理器|http-客户端|处理器> queue:testQueue

1)现在我将流拆分如下:

 aws-s3-source> queue:s3Queue

我能够非常快地阅读所有 100 万条消息。 2) 现在我又添加了一个如下所示的流,我再次看到问题是 s3 停止拉取文件并且消息每次都丢失

 queue:s3Queue>processor|http-client| processor> queue:testQueue 

3) 观察是当我添加 http-client 时这个问题再次发生,即来自输入源的某些消息丢失。

4) 现在我将文件拆分为 125 Mb 的 5 个文件而不是 660mb 的一个文件。即 200 k 条记录 5 files.I 看不到问题我收到了所有消息

我还看到很多消息在 http-client 之前阻塞在队列中。 我在想这与 xd 内部的内存或线程有关吗?

Please let me know if expression advice is deleting before processing.

没有;该建议是围绕消息处理程序的 around 建议;它无法执行(评估表达式),直到拆分器发出所有行。

是否可以在文件完全写入之前从 s3 中提取文件?

要调试此问题,我建议更改建议以将文件发送到另一个子流并在删除之前在那里做一些 analysis/logging。