如何正确实现HTTP sink?

How to implement HTTP sink correctly?

我想通过 HTTP 协议将我的 DataStream 流的计算结果发送到其他服务。我看到了两种可能的实现方式:

  1. 在接收器中使用同步 Apache HttpClient 客户端
public class SyncHttpSink extends RichSinkFunction<SessionItem> {
    private static final String URL = "http://httpbin.org/post";

    private CloseableHttpClient httpClient;

    private Histogram httpStatusesAccumulator;

    @Override
    public void open(Configuration parameters) throws Exception {
        httpClient = HttpClients.custom()
            .setKeepAliveStrategy(DefaultConnectionKeepAliveStrategy.INSTANCE)
            .build();

        httpStatusesAccumulator = getRuntimeContext().getHistogram("http_statuses");
    }

    @Override
    public void close() throws Exception {
        httpClient.close();

        httpStatusesAccumulator.resetLocal();
    }

    @Override
    public void invoke(SessionItem value) throws Exception {
        List<NameValuePair> params = new ArrayList<>();
        params.add(new BasicNameValuePair("session_uid", value.getSessionUid()));
        params.add(new BasicNameValuePair("traffic", String.valueOf(value.getTraffic())));
        params.add(new BasicNameValuePair("duration", String.valueOf(value.getDuration())));

        UrlEncodedFormEntity entity = new UrlEncodedFormEntity(params, Consts.UTF_8);

        HttpPost httpPost = new HttpPost(URL);
        httpPost.setEntity(entity);

        try(CloseableHttpResponse response = httpClient.execute(httpPost)) {
            int httpStatusCode = response.getStatusLine().getStatusCode();

            httpStatusesAccumulator.add(httpStatusCode);
        }
    }
}
  1. 在接收器中使用异步 Apache HttpAsyncClient 客户端
public class AsyncHttpSink extends RichSinkFunction<SessionItem> {
    private static final String URL = "http://httpbin.org/post";

    private CloseableHttpAsyncClient httpClient;

    private Histogram httpStatusesAccumulator;

    @Override
    public void open(Configuration parameters) throws Exception {
        httpClient = HttpAsyncClients.custom()
                .setKeepAliveStrategy(DefaultConnectionKeepAliveStrategy.INSTANCE)
                .build();
        httpClient.start();

        httpStatusesAccumulator = getRuntimeContext().getHistogram("http_statuses");
    }

    @Override
    public void close() throws Exception {
        httpClient.close();

        httpStatusesAccumulator.resetLocal();
    }

    @Override
    public void invoke(SessionItem value) throws Exception {
        List<NameValuePair> params = new ArrayList<>();
        params.add(new BasicNameValuePair("session_uid", value.getSessionUid()));
        params.add(new BasicNameValuePair("traffic", String.valueOf(value.getTraffic())));
        params.add(new BasicNameValuePair("duration", String.valueOf(value.getDuration())));

        UrlEncodedFormEntity entity = new UrlEncodedFormEntity(params, Consts.UTF_8);

        HttpPost httpPost = new HttpPost(URL);
        httpPost.setEntity(entity);

        httpClient.execute(httpPost, new FutureCallback<HttpResponse>() {
            @Override
            public void completed(HttpResponse response) {
                int httpStatusCode = response.getStatusLine().getStatusCode();

                httpStatusesAccumulator.add(httpStatusCode);
            }

            @Override
            public void failed(Exception ex) {
                httpStatusesAccumulator.add(-1); // -1 - failed
            }

            @Override
            public void cancelled() {
                httpStatusesAccumulator.add(-2); // -2 - cancelled
            }
        });
    }
}

问题:

  1. 我应该在接收器中使用同步还是异步 HTTP 客户端?

  2. 如果我使用同步客户端,它会阻塞 sink 并通过背压 Flink 阻塞源。对吧?

  3. 如果我使用异步客户端,它不会阻止接收。对吧?

  4. 累加器不是线程安全的?即我可以在异步回调中使用它吗?

  5. RuntimeContext 不是线程安全的?即我可以在异步回调中使用它吗?

1。我应该在接收器中使用同步还是异步 HTTP 客户端?

为了避免因阻塞 HTTP 调用而产生的背压,我建议使用异步 HTTP 客户端。

2。如果我将使用同步客户端,它将阻塞接收器,并通过背压 Flink 阻塞源。对吧?

是的,没错。背压将通过您的拓扑传播到源。

3。如果我将使用异步客户端,它不会阻止接收器。对吧?

这是正确的。

4。累加器不是线程安全的吗?即我可以在异步回调中使用它吗?

累加器不是线程安全的,因此必须同步访问它们。

5。 RuntimeContext 不是线程安全的?即我可以在异步回调中使用它吗?

RuntimeContext 不是线程安全的,因此必须同步访问它。