当我们将 Flink 应用程序部署到 Kinesis Data Analytics 中时,不会触发窗口化

Windowing is not triggered when we deployed the Flink application into Kinesis Data Analytics

我们有一个 Apache Flink POC 应用程序,它在本地运行良好,但在我们部署到 Kinesis Data Analytics (KDA) 之后,它不会将记录发送到接收器中。

使用的技术

本地

AWS

应用程序逻辑

  1. FlinkKafkaConsumer 从主题
  2. 中读取 json 格式的消息
  3. json 映射到域对象,称为 Telemetry
private static DataStream<Telemetry> SetupKafkaSource(StreamExecutionEnvironment environment){
    Properties kafkaProperties = new Properties();
    kafkaProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "BROKER1_ADDRESS.amazonaws.com:9092");
    kafkaProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink_consumer");

    FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("THE_TOPIC", new SimpleStringSchema(), kafkaProperties);

    consumer.setStartFromEarliest(); //Just for repeatable testing

    return environment
            .addSource(consumer)
            .map(new MapJsonToTelemetry());
}
  1. 为 EventTimeStamp 选择了遥测的时间戳。
    3.1.随着 forMonotonousTimeStamps
  2. Telemetry 的 StateIso 用于 keyBy
    4.1.美国的两个字母iso代码
  3. 5 秒翻滚window 策略已应用
private static SingleOutputStreamOperator<StateAggregatedTelemetry> SetupProcessing(DataStream<Telemetry> telemetries) {
    WatermarkStrategy<Telemetry> wmStrategy =
            WatermarkStrategy
                    .<Telemetry>forMonotonousTimestamps()
                    .withTimestampAssigner((event, timestamp) -> event.TimeStamp);

    return telemetries
            .assignTimestampsAndWatermarks(wmStrategy)
            .keyBy(t -> t.StateIso)
            .window(TumblingEventTimeWindows.of(Time.seconds(5)))
            .process(new WindowCountFunction());
}
  1. 调用自定义 ProcessWindowFunction 来执行一些基本聚合。
    6.1.我们计算单个StateAggregatedTelemetry
  2. ElasticSearch 配置为接收器。
    7.1. StateAggregatedTelemetry 数据映射到 HashMap 并推入 source.
    7.2.所有 setBulkFlushXYZ 方法都设置为低值
private static void SetupElasticSearchSink(SingleOutputStreamOperator<StateAggregatedTelemetry> telemetries) {
    List<HttpHost> httpHosts = new ArrayList<>();
    httpHosts.add(HttpHost.create("https://ELKCLUSTER_ADDRESS.amazonaws.com:443"));

    ElasticsearchSink.Builder<StateAggregatedTelemetry> esSinkBuilder = new ElasticsearchSink.Builder<>(
            httpHosts,
            (ElasticsearchSinkFunction<StateAggregatedTelemetry>) (element, ctx, indexer) -> {
                Map<String, Object> record = new HashMap<>();

                record.put("stateIso", element.StateIso);
                record.put("healthy", element.Flawless);
                record.put("unhealthy", element.Faulty);
                ...

                LOG.info("Telemetry has been added to the buffer");
                indexer.add(Requests.indexRequest()
                        .index("INDEXPREFIX-"+ from.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")))
                        .source(record, XContentType.JSON));
            }
    );

    //Using low values to make sure that the Flush will happen
    esSinkBuilder.setBulkFlushMaxActions(25);
    esSinkBuilder.setBulkFlushInterval(1000);
    esSinkBuilder.setBulkFlushMaxSizeMb(1);
    esSinkBuilder.setBulkFlushBackoff(true);
    esSinkBuilder.setRestClientFactory(restClientBuilder -> {});

    LOG.info("Sink has been attached to the DataStream");
    telemetries.addSink(esSinkBuilder.build());
}

排除的东西

{
    "locationInformation": "org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge.verifyClientConnection(Elasticsearch7ApiCallBridge.java:135)",
    "logger": "org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge",
    "message": "Pinging Elasticsearch cluster via hosts [https://...es.amazonaws.com:443] ...",
    "threadName": "Window(TumblingEventTimeWindows(5000), EventTimeTrigger, WindowCountFunction) -> (Sink: Print to Std. Out, Sink: Unnamed, Sink: Print to Std. Out) (2/2)",
    "applicationARN": "arn:aws:kinesisanalytics:...",
    "applicationVersionId": "39",
    "messageSchemaVersion": "1",
    "messageType": "INFO"
}

回应

{
    "locationInformation": "org.elasticsearch.client.RequestLogger.logResponse(RequestLogger.java:59)",
    "logger": "org.elasticsearch.client.RestClient",
    "message": "request [HEAD https://...es.amazonaws.com:443/] returned [HTTP/1.1 200 OK]",
    "threadName": "Window(TumblingEventTimeWindows(5000), EventTimeTrigger, WindowCountFunction) -> (Sink: Print to Std. Out, Sink: Unnamed, Sink: Print to Std. Out) (2/2)",
    "applicationARN": "arn:aws:kinesisanalytics:...",
    "applicationVersionId": "39",
    "messageSchemaVersion": "1",
    "messageType": "DEBUG"
}

我们已经尝试过但没有运气的东西


我们假设我们在接收器端看不到数据,因为 window 处理逻辑没有被触发。这就是为什么在 CloudWatch 中看不到处理日志的原因。

我们非常欢迎任何帮助!


更新#1


更新#2

邮件的平均大小约为 4kb。以下是示例消息的摘录:

{
  "affiliateCode": "...",
  "appVersion": "1.1.14229",
  "clientId": "guid",
  "clientIpAddr": "...",
  "clientOriginated": true,
  "connectionType": "Cable/DSL",
  "countryCode": "US",
  "design": "...",
  "device": "...",
  ...
  "deviceSerialNumber": "...",
  "dma": "UNKNOWN",
  "eventSource": "...",
  "firstRunTimestamp": 1609091112818,
  "friendlyDeviceName": "Comcast",
  "fullDevice": "Comcast ...",
  "geoInfo": {
    "continent": {
      "code": "NA",
      "geoname_id": 120
    },
    "country": {
      "geoname_id": 123,
      "iso_code": "US"
    },
    "location": {
      "accuracy_radius": 100,
      "latitude": 37.751,
      "longitude": -97.822,
      "time_zone": "America/Chicago"
    },
    "registered_country": {
      "geoname_id": 123,
      "iso_code": "US"
    }
  },
  "height": 720,
  "httpUserAgent": "Mozilla/...",
  "isLoggedIn": true,
  "launchCount": 19,
  "model": "...",
  "os": "Comcast...",
  "osVersion": "...",
  ...
  "platformTenantCode": "...",
  "productCode": "...",
  "requestOrigin": "https://....com",
  "serverTimeUtc": 1617809474787,
  "serviceCode": "...",
  "serviceOriginated": false,
  "sessionId": "guid",
  "sessionSequence": 2,
  "subtype": "...",
  "tEventId": "...",
  ...
  "tRegion": "us-east-1",
  "timeZoneOffset": 5,
  "timestamp": 1617809473305,
  "traits": {
    "isp": "Comcast Cable",
    "organization": "..."
  },
  "type": "...",
  "userId": "guid",
  "version": "v1",
  "width": 1280,
  "xb3traceId": "guid"
}

我们使用 ObjectMapper 仅解析 json 的部分字段。这是 Telemetry class 的样子:

public class Telemetry {
    public String AppVersion;
    public String CountryCode;
    public String ClientId;
    public String DeviceSerialNumber;
    public String EventSource;
    public String SessionId;
    public TelemetrySubTypes SubType; //enum
    public String TRegion;
    public Long TimeStamp;
    public TelemetryTypes Type; //enum
    public String StateIso;
    
    ...
}

更新 #3

来源

子任务选项卡

ID Bytes received Records received Bytes sent Records sent Status
0 0 B 0 0 B 0 RUNNING
1 0 B 0 2.83 MB 15,000 RUNNING

水印选项卡

无数据

Window

子任务选项卡

ID Bytes received Records received Bytes sent Records sent Status
0 1.80 MB 9,501 0 B 0 RUNNING
1 1.04 MB 5,499 0 B 0 RUNNING

水印

SubTask Watermark
1 No Watermark
2 No Watermark

根据您提供的评论和更多信息,问题似乎是两个 Flink 消费者无法从同一分区消费。因此,在您的情况下,只有一个运算符的并行实例将从 kafka 分区中消耗,而另一个将处于空闲状态。

一般来说,Flink 运算符会 select MIN([all_downstream_parallel_watermarks]),所以在你的情况下,一个 Kafka 消费者会产生正常的 Watermarks 而另一个永远不会产生任何东西(在这种情况下,flink 假设 Long.Min ),因此 Flink 将 select 较低的 Long.Min。因此,window 将永远不会被触发,因为当数据在流动时,永远不会生成一个水印。好的做法是在使用 Kafka 时使用与 Kafka 分区数相同的并行性。

在获得 AWS 人员的支持后 session 发现我们未能在流媒体环境中设置时间特征。

  • 在 1.11.1 中,TimeCharacteristic 的默认值为 IngestionTime
  • 从 1.12.1 开始(参见相关 release notes)默认值为 EventTime:

In Flink 1.12 the default stream time characteristic has been changed to EventTime, thus you don’t need to call this method for enabling event-time support anymore.

因此,在我们明确设置 EventTime 之后,它开始像魅力一样生成水印:

streamingEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);