高并发调用http调用丢失消息

losing messages when calling http call with high concurrency

你好,我有像 below.Where 这样的流定义,我从 s3 中逐行提取文件并调用 http 客户端并放入命名 channel.My 传输是 rabbit,预取是 10,http 的并发是100 和 运行 在 3 个容器和 1 个管理员上。

stream aws-s3|custom processor| custom-http-client --url1=https://test1.com --url2=https://test1.com --filterAttribute=messageAttribute --httpMethod=POST --nonRetryErrorCodes=400,401,404,500 --charset=UTF-8 --replyTimeout=30000 --mapHeaders=Api-Key,Content-Type --requestTimeOut=30000  |processor> queue:testQueue

我的 http-config 如下所示,并使用 apache http 客户端进行连接池和多线程我将所有非常错误(如套接字超时)放回 DLQ 并重试。所有不重试错误 50x 我正在传递到下一个模块并在我调用外部休息 API 后写入错误 queue.But 我正在丢失 messages.I 我发送了大约 220k 条消息,有时我收到 200k 条消息,有时我收到所有 220k,有时我收到 210k它 random.Not 确定我是否在做任何事情 wrong.I 试图增加请求超时套接字时间 out.Till 我的处理器在 HTTP 之前我收到所有消息但在 http 客户端之后我在我的命名中看到较少的消息通道队列,错误队列中没有任何内容。但我很确定在调用 http-client 后消息会丢失。当数据负载很高时会发生这种情况,例如百万和 200k+ 记录,而较小的负载如 500 到 1000 条记录我没有看到这个问题。

<beans:beans xmlns="http://www.springframework.org/schema/integration"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:beans="http://www.springframework.org/schema/beans"
             xmlns:int-http="http://www.springframework.org/schema/integration/http"
             xmlns:context="http://www.springframework.org/schema/context"
             xsi:schemaLocation="http://www.springframework.org/schema/beans
                http://www.springframework.org/schema/beans/spring-beans.xsd
                http://www.springframework.org/schema/integration
                http://www.springframework.org/schema/integration/spring-integration.xsd
                http://www.springframework.org/schema/integration/http
                http://www.springframework.org/schema/integration/http/spring-integration-http.xsd
                http://www.springframework.org/schema/context
               http://www.springframework.org/schema/context/spring-context-4.0.xsd">

    <!--    <context:property-placeholder location="${xd.module.config.location}\processor${xd.module.name}\batch-http.properties"
         ignore-resource-not-found="true" local-override="true"/> -->

    <context:property-placeholder />

   <!-- logger changes start -->
    <channel-interceptor pattern="*" order="3">
        <beans:bean class="org.springframework.integration.channel.interceptor.WireTap">
            <beans:constructor-arg ref="loggingChannel" />
        </beans:bean>
    </channel-interceptor>

    <logging-channel-adapter id="loggingChannel" log-full-message="true" level="ERROR"/>

<!-- logger changes end -->


    <header-filter input-channel="input"
                   output-channel="inputX" header-names="x-death"/>

    <service-activator input-channel="inputX" ref="gw" />

    <gateway id="gw" default-request-channel="toHttp" default-reply-timeout="0"  error-channel="errors" />

    <beans:bean id="inputfields" class="test.HTTPInputProperties">
        <beans:property name="nonRetryErrorCodes" value="${nonRetryErrorCodes}"/>
    </beans:bean>
    <beans:bean id="responseInterceptor" class="test.ResponseInterceptor">
        <beans:property name="inputProperties" ref="inputfields" />
    </beans:bean>

    <chain input-channel="errors" output-channel="output">
        <!-- examine payload.cause (http status code etc) and decide whether
             to throw an exception or return the status code for sending to output -->
        <header-filter header-names="replyChannel, errorChannel" />
        <transformer ref="responseInterceptor"  />
    </chain>


    <int-http:outbound-gateway id='batch-http'  header-mapper="headerMapper"
                               request-channel='toHttp'
                               rest-template="batchRestTemplate"
                               url-expression="payload.contains('${filterAttribute}') ? '${url1}' : '${url2}'"  http-method="${httpMethod}"
                               expected-response-type='java.lang.String' charset='${charset}'
                               reply-timeout='${replyTimeout}' reply-channel='output'>
    </int-http:outbound-gateway>

    <beans:bean  id="batchHTTPConverter" class="org.springframework.http.converter.StringHttpMessageConverter" >
        <beans:constructor-arg index="0"  value="${charset}"/>
        <beans:property name="supportedMediaTypes" value = "application/json;UTF-8" />

    </beans:bean>

     <beans:bean  id="batchRestTemplate" class="testBatchRestTemplate" >
       <beans:constructor-arg name="requestTimeOut" value="${requestTimeOut}"/>
        <beans:constructor-arg name="maxConnectionPerRoute" value="${maxConnectionPerRoute}"/>
        <beans:constructor-arg name="totalMaxConnections" ref="${totalMaxConnections}"/>
</beans:bean>





<beans:bean id="headerMapper" class="org.springframework.integration.http.support.DefaultHttpHeaderMapper"
            factory-method="outboundMapper">
<beans:property name="outboundHeaderNames" value="${mapHeaders}"/>
<beans:property name="userDefinedHeaderPrefix" value=""/>
</beans:bean>

<channel id="output" />
<channel id="input" />
<channel id="inputX" />
<channel id="toHttp" />

        </beans:beans>



public class BatchRestTemplate  extends RestTemplate{

    private static final Logger LOGGER = LoggerFactory
                .getLogger(BatchRestTemplate.class);

    private  static Integer requestTimeOut;

    private  static Integer totalMaxConnections;
    private  static Integer maxConnectionPerRoute;






    public BatchRestTemplate(Integer requestTimeOut,Integer totalMaxConnections,Integer maxConnectionPerRoute) throws NoSuchAlgorithmException  {
          super(createBatchHttpRequestFactory());
        List<HttpMessageConverter<?>> messageConverters= new ArrayList<HttpMessageConverter<?>>();
        messageConverters.addAll(getMessageConverters());
        messageConverters.add(0,new StringHttpMessageConverter(Charset.forName("UTF-8")));
        setMessageConverters(messageConverters);

    }

        private static ClientHttpRequestFactory createBatchHttpRequestFactory() throws NoSuchAlgorithmException  {

            CloseableHttpClient httpClient;
            HttpComponentsClientHttpRequestFactory httpRequestFactory;

            SSLConnectionSocketFactory socketFactory;

                socketFactory = new SSLConnectionSocketFactory(
                        SSLContext.getDefault(),
                        new String[] {"TLSv1"},
                        null,
                        SSLConnectionSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER);

            Registry<ConnectionSocketFactory> socketFactoryRegistry = RegistryBuilder.<ConnectionSocketFactory>create()
                    .register("http", PlainConnectionSocketFactory.getSocketFactory())
                    .register("https", socketFactory)
                    .build();
            PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager(socketFactoryRegistry);
            cm.setMaxTotal(250);
            cm.setDefaultMaxPerRoute(100);
            cm.closeExpiredConnections();



            RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(30000)
                    .setConnectionRequestTimeout(30000).setSocketTimeout(30000).build();



            httpClient = HttpClients.custom().setDefaultRequestConfig(requestConfig).setConnectionManager(cm).build();


            httpRequestFactory = new HttpComponentsClientHttpRequestFactory(httpClient);
            return httpRequestFactory; 
    }


    }

响应拦截器

public class ResponseInterceptor {

    private HTTPInputProperties inputProperties;
    private static final Logger LOGGER = LoggerFactory.getLogger(ResponseInterceptor.class);

    /**
     * Intercepts the errorMessage from the API response and sends appropriate
     * information to the Output channel.
     * 
     * @param errorMessage
     * @return Message
     */
    public Message<String> transform(Message<MessagingException> errorMessage) {

        LOGGER.error("Inside Response Interceptor !");
        Message<String> responseMessage = null;
        try {
            if (null != errorMessage && null != errorMessage.getPayload()
                    && null != errorMessage.getPayload().getCause()) {
                LOGGER.error("Cause is - " + errorMessage.getPayload().getCause().getMessage());
                if (errorMessage.getPayload().getCause() instanceof HttpClientErrorException) {

                    HttpClientErrorException clientError = (HttpClientErrorException) errorMessage.getPayload()
                            .getCause();
                    LOGGER.error("Error in ResponseInceptor", clientError);
                    List<String> errorCodeList = getErrorCodes(inputProperties.getNonRetryErrorCodes());
                    // intercept Only those errors that are defined as
                    // nonRetryErrorCodes options in stream definition
                    if (null != clientError.getStatusCode()
                            && errorCodeList.contains(clientError.getStatusCode().toString())) {

                        LOGGER.error("Error in Response Body", clientError.getResponseBodyAsString());
                        LOGGER.debug("Non retry message found. Sending to output channel without retrying");

                        responseMessage = MessageBuilder.withPayload((null == clientError.getResponseBodyAsString() || clientError.getResponseBodyAsString().isEmpty()) 
                                ? getDefaultPayload(clientError.getStatusCode().toString()) : clientError.getResponseBodyAsString())
                                .setHeader(BatchHttpClientConstants.HTTP_STATUS, clientError.getStatusCode().toString())
                                .setHeader(BatchHttpClientConstants.REQUEST_OBJECT,
                                        getFailedMessagePayload(errorMessage))
                                .copyHeaders(errorMessage.getPayload().getFailedMessage().getHeaders())
                                .setReplyChannelName(BatchHttpClientConstants.OUTPUT).setErrorChannelName(null).build();

                    } else {
                        LOGGER.debug("Status code from API is not present in the nonRetryCodes");
                    }
                } else if (errorMessage.getPayload().getCause() instanceof HttpServerErrorException) {

                    LOGGER.error("Error is Instance of HttpServerErrorException");
                    HttpServerErrorException serverError = (HttpServerErrorException) errorMessage.getPayload()
                            .getCause();

                    responseMessage = MessageBuilder
                            .withPayload((null == serverError.getResponseBodyAsString()
                            || serverError.getResponseBodyAsString().isEmpty())
                            ? getDefaultPayload(serverError.getStatusCode().toString())
                            : serverError.getResponseBodyAsString())
                            .setHeader(BatchHttpClientConstants.HTTP_STATUS, serverError.getStatusCode().toString())
                            .setHeader(BatchHttpClientConstants.REQUEST_OBJECT, getFailedMessagePayload(errorMessage))
                            .copyHeaders(errorMessage.getPayload().getFailedMessage().getHeaders())
                            .setReplyChannelName(BatchHttpClientConstants.OUTPUT).setErrorChannelName(null).build();


                }

            }
        } catch (Exception exception) {
            LOGGER.error("Exception occured while transforming errorResponse", exception);
        }

        // returning null will send the message back to previous module
        return responseMessage;
    }

    private String getDefaultPayload(String httpStatusCode) {

        JSONObject jsonResponse = new JSONObject();
        if (BatchHttpClientConstants.INTERNAL_SERVER_ERROR.equalsIgnoreCase(httpStatusCode)) {
            jsonResponse.put(BatchHttpClientConstants.ID, BatchHttpClientConstants.INTERNAL_SERVER_ERROR_SUBCODE);
            jsonResponse.put(BatchHttpClientConstants.TEXT, "Internal Server Error");
        } else if (BatchHttpClientConstants.RESOURCE_NOT_FOUND.equalsIgnoreCase(httpStatusCode)) {
            jsonResponse.put(BatchHttpClientConstants.ID, BatchHttpClientConstants.RESOURCE_NOT_FOUND_SUBCODE);
            jsonResponse.put(BatchHttpClientConstants.TEXT, "Empty Response From the API");
        }else{
            jsonResponse.put(BatchHttpClientConstants.ID, BatchHttpClientConstants.GENERIC_ERROR_SUBCODE);
            jsonResponse.put(BatchHttpClientConstants.TEXT, "Generic Error Occured.");
        }

        return jsonResponse.toString();

    }

    /**
     * Get Individual error codes using delimiter
     * 
     * @param nonRetryErrorCodes
     * @return List of Error Codes as string
     */
    private List<String> getErrorCodes(String nonRetryErrorCodes) {

        List<String> errorCodeList = new ArrayList<String>();
        StringTokenizer st = new StringTokenizer(nonRetryErrorCodes, BatchHttpClientConstants.DELIMITER);
        while (st.hasMoreElements()) {
            errorCodeList.add(st.nextToken());
        }
        return errorCodeList;
    }

    /**
     * returns failed Message Payload
     * 
     * @param errorMessage
     * @return String
     * @throws UnsupportedEncodingException
     */
    private byte[] getFailedMessagePayload(Message<MessagingException> errorMessage)
            throws UnsupportedEncodingException {

        if (null != errorMessage.getPayload().getFailedMessage()
                && null != errorMessage.getPayload().getFailedMessage().getPayload()) {
            return errorMessage.getPayload().getFailedMessage().getPayload().toString()
                    .getBytes(BatchHttpClientConstants.UTF_8);
        }

        return "".getBytes(BatchHttpClientConstants.UTF_8);
    }

    public HTTPInputProperties getInputProperties() {
        return inputProperties;
    }

    public void setInputProperties(HTTPInputProperties inputProperties) {
        this.inputProperties = inputProperties;
    }

}

我可以推荐一个 <aggregator> 作为诊断工具。

  1. 将消息发送到 <int-http:outbound-gateway>(或者在流程开始时更好 - 在 input 频道)。

  2. 并将该消息也发送给 <aggregator>

  3. 消息中的一些 key 应该用作 correlationKey

  4. 预计来自 HTTP 网关的 reply 作为组中要发布的第二条消息。

  5. ReleaseStrategy是基于size = 2的标准MessageCountReleaseStrategy

  6. 这里是<aggregator>的主要技巧——group-timeout,应该比socket timeout多一点。 "uncompleted" 组(仅请求)应该被丢弃到其他渠道,在那里您将能够报告那些未送达的消息并咨询您的 REST 服务发生了什么。