当我们将 Flink 应用程序部署到 Kinesis Data Analytics 中时,不会触发窗口化
Windowing is not triggered when we deployed the Flink application into Kinesis Data Analytics
我们有一个 Apache Flink POC 应用程序,它在本地运行良好,但在我们部署到 Kinesis Data Analytics (KDA) 之后,它不会将记录发送到接收器中。
使用的技术
本地
- 来源:卡夫卡2.7
- 1 位经纪人
- 1 个分区为 1 且复制因子为 1 的主题
- 正在处理:Flink 1.12.1
- 接收器:Managed ElasticSearch Service 7.9.1(与 AWS 相同的实例)
AWS
- 来源:亚马逊 MSK Kafka 2.8
- 3 个经纪人(但我们正在连接一个)
- 1 个主题,分区为 1,复制因子 3
- 正在处理:Amazon KDA Flink 1.11.1
- 平行度:2
- 每个 KPU 的并行度:2
- 接收器:托管 ElasticSearch 服务 7.9.1
应用程序逻辑
FlinkKafkaConsumer
从主题 中读取 json 格式的消息
- json 映射到域对象,称为
Telemetry
private static DataStream<Telemetry> SetupKafkaSource(StreamExecutionEnvironment environment){
Properties kafkaProperties = new Properties();
kafkaProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "BROKER1_ADDRESS.amazonaws.com:9092");
kafkaProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink_consumer");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("THE_TOPIC", new SimpleStringSchema(), kafkaProperties);
consumer.setStartFromEarliest(); //Just for repeatable testing
return environment
.addSource(consumer)
.map(new MapJsonToTelemetry());
}
- 为 EventTimeStamp 选择了遥测的时间戳。
3.1.随着 forMonotonousTimeStamps
- Telemetry 的
StateIso
用于 keyBy
。
4.1.美国的两个字母iso代码
- 5 秒翻滚window 策略已应用
private static SingleOutputStreamOperator<StateAggregatedTelemetry> SetupProcessing(DataStream<Telemetry> telemetries) {
WatermarkStrategy<Telemetry> wmStrategy =
WatermarkStrategy
.<Telemetry>forMonotonousTimestamps()
.withTimestampAssigner((event, timestamp) -> event.TimeStamp);
return telemetries
.assignTimestampsAndWatermarks(wmStrategy)
.keyBy(t -> t.StateIso)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.process(new WindowCountFunction());
}
- 调用自定义
ProcessWindowFunction
来执行一些基本聚合。
6.1.我们计算单个StateAggregatedTelemetry
- ElasticSearch 配置为接收器。
7.1. StateAggregatedTelemetry
数据映射到 HashMap
并推入 source
.
7.2.所有 setBulkFlushXYZ
方法都设置为低值
private static void SetupElasticSearchSink(SingleOutputStreamOperator<StateAggregatedTelemetry> telemetries) {
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(HttpHost.create("https://ELKCLUSTER_ADDRESS.amazonaws.com:443"));
ElasticsearchSink.Builder<StateAggregatedTelemetry> esSinkBuilder = new ElasticsearchSink.Builder<>(
httpHosts,
(ElasticsearchSinkFunction<StateAggregatedTelemetry>) (element, ctx, indexer) -> {
Map<String, Object> record = new HashMap<>();
record.put("stateIso", element.StateIso);
record.put("healthy", element.Flawless);
record.put("unhealthy", element.Faulty);
...
LOG.info("Telemetry has been added to the buffer");
indexer.add(Requests.indexRequest()
.index("INDEXPREFIX-"+ from.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")))
.source(record, XContentType.JSON));
}
);
//Using low values to make sure that the Flush will happen
esSinkBuilder.setBulkFlushMaxActions(25);
esSinkBuilder.setBulkFlushInterval(1000);
esSinkBuilder.setBulkFlushMaxSizeMb(1);
esSinkBuilder.setBulkFlushBackoff(true);
esSinkBuilder.setRestClientFactory(restClientBuilder -> {});
LOG.info("Sink has been attached to the DataStream");
telemetries.addSink(esSinkBuilder.build());
}
排除的东西
- 我们设法将 Kafka、KDA 和 ElasticSearch 置于相同的 VPC 和相同的子网下,以避免需要对每个请求进行签名
- 从日志中可以看出Flink可以到达ES集群
要求
{
"locationInformation": "org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge.verifyClientConnection(Elasticsearch7ApiCallBridge.java:135)",
"logger": "org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge",
"message": "Pinging Elasticsearch cluster via hosts [https://...es.amazonaws.com:443] ...",
"threadName": "Window(TumblingEventTimeWindows(5000), EventTimeTrigger, WindowCountFunction) -> (Sink: Print to Std. Out, Sink: Unnamed, Sink: Print to Std. Out) (2/2)",
"applicationARN": "arn:aws:kinesisanalytics:...",
"applicationVersionId": "39",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
回应
{
"locationInformation": "org.elasticsearch.client.RequestLogger.logResponse(RequestLogger.java:59)",
"logger": "org.elasticsearch.client.RestClient",
"message": "request [HEAD https://...es.amazonaws.com:443/] returned [HTTP/1.1 200 OK]",
"threadName": "Window(TumblingEventTimeWindows(5000), EventTimeTrigger, WindowCountFunction) -> (Sink: Print to Std. Out, Sink: Unnamed, Sink: Print to Std. Out) (2/2)",
"applicationARN": "arn:aws:kinesisanalytics:...",
"applicationVersionId": "39",
"messageSchemaVersion": "1",
"messageType": "DEBUG"
}
- 我们还可以通过查看 Flink 仪表板来验证消息是否已从 Kafka 主题读取并发送以供处理
我们已经尝试过但没有运气的东西
- 我们实现了一个
RichParallelSourceFunction
发出 1_000_000 消息然后退出
- 这在本地环境中运行良好
- 作业在AWS环境完成,但sink端没有数据
- 我们实现了另一个
RichParallelSourceFunction
,它每秒发出 100 条消息
- 基本上我们有两个循环,一个
while(true)
外循环和 for
内循环
- 在内部循环之后我们调用了
Thread.sleep(1000)
- 这在本地环境下运行得非常好
- 但在 AWS 中我们可以看到检查点的大小不断增长,并且在 ELK 中没有出现任何消息
- 我们已经尝试 运行 具有不同并行设置的 KDA 应用程序
- 但没有区别
- 我们还尝试了不同的水印策略(
forBoundedOutOfOrderness
、withIdle
、noWatermarks
)
- 但没有区别
- 我们为
ProcessWindowFunction
和 ElasticsearchSinkFunction
添加了日志
- 每当我们 运行 来自 IDEA 的应用程序时,这些日志就会出现在控制台上
- 每当我们 运行 带有 KDA 的应用程序时,CloudWatch 中就没有这样的日志
- 那些添加到
main
的日志确实出现在 CloudWatch 日志中
我们假设我们在接收器端看不到数据,因为 window 处理逻辑没有被触发。这就是为什么在 CloudWatch 中看不到处理日志的原因。
我们非常欢迎任何帮助!
更新#1
- 我们已经尝试将 Flink 版本从 1.12.1 降级到 1.11.1
- 没有变化
- 我们已尝试处理时间 window 而不是事件时间
- 它甚至在本地环境中都不起作用
更新#2
邮件的平均大小约为 4kb。以下是示例消息的摘录:
{
"affiliateCode": "...",
"appVersion": "1.1.14229",
"clientId": "guid",
"clientIpAddr": "...",
"clientOriginated": true,
"connectionType": "Cable/DSL",
"countryCode": "US",
"design": "...",
"device": "...",
...
"deviceSerialNumber": "...",
"dma": "UNKNOWN",
"eventSource": "...",
"firstRunTimestamp": 1609091112818,
"friendlyDeviceName": "Comcast",
"fullDevice": "Comcast ...",
"geoInfo": {
"continent": {
"code": "NA",
"geoname_id": 120
},
"country": {
"geoname_id": 123,
"iso_code": "US"
},
"location": {
"accuracy_radius": 100,
"latitude": 37.751,
"longitude": -97.822,
"time_zone": "America/Chicago"
},
"registered_country": {
"geoname_id": 123,
"iso_code": "US"
}
},
"height": 720,
"httpUserAgent": "Mozilla/...",
"isLoggedIn": true,
"launchCount": 19,
"model": "...",
"os": "Comcast...",
"osVersion": "...",
...
"platformTenantCode": "...",
"productCode": "...",
"requestOrigin": "https://....com",
"serverTimeUtc": 1617809474787,
"serviceCode": "...",
"serviceOriginated": false,
"sessionId": "guid",
"sessionSequence": 2,
"subtype": "...",
"tEventId": "...",
...
"tRegion": "us-east-1",
"timeZoneOffset": 5,
"timestamp": 1617809473305,
"traits": {
"isp": "Comcast Cable",
"organization": "..."
},
"type": "...",
"userId": "guid",
"version": "v1",
"width": 1280,
"xb3traceId": "guid"
}
我们使用 ObjectMapper
仅解析 json 的部分字段。这是 Telemetry
class 的样子:
public class Telemetry {
public String AppVersion;
public String CountryCode;
public String ClientId;
public String DeviceSerialNumber;
public String EventSource;
public String SessionId;
public TelemetrySubTypes SubType; //enum
public String TRegion;
public Long TimeStamp;
public TelemetryTypes Type; //enum
public String StateIso;
...
}
更新 #3
来源
子任务选项卡
ID
Bytes received
Records received
Bytes sent
Records sent
Status
0
0 B
0
0 B
0
RUNNING
1
0 B
0
2.83 MB
15,000
RUNNING
水印选项卡
无数据
Window
子任务选项卡
ID
Bytes received
Records received
Bytes sent
Records sent
Status
0
1.80 MB
9,501
0 B
0
RUNNING
1
1.04 MB
5,499
0 B
0
RUNNING
水印
SubTask
Watermark
1
No Watermark
2
No Watermark
根据您提供的评论和更多信息,问题似乎是两个 Flink 消费者无法从同一分区消费。因此,在您的情况下,只有一个运算符的并行实例将从 kafka 分区中消耗,而另一个将处于空闲状态。
一般来说,Flink 运算符会 select MIN([all_downstream_parallel_watermarks])
,所以在你的情况下,一个 Kafka 消费者会产生正常的 Watermarks 而另一个永远不会产生任何东西(在这种情况下,flink 假设 Long.Min
),因此 Flink 将 select 较低的 Long.Min
。因此,window 将永远不会被触发,因为当数据在流动时,永远不会生成一个水印。好的做法是在使用 Kafka 时使用与 Kafka 分区数相同的并行性。
在获得 AWS 人员的支持后 session 发现我们未能在流媒体环境中设置时间特征。
- 在 1.11.1 中,
TimeCharacteristic
的默认值为 IngestionTime
。
- 从 1.12.1 开始(参见相关 release notes)默认值为
EventTime
:
In Flink 1.12 the default stream time characteristic has been changed to EventTime, thus you don’t need to call this method for enabling event-time support anymore.
因此,在我们明确设置 EventTime
之后,它开始像魅力一样生成水印:
streamingEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
我们有一个 Apache Flink POC 应用程序,它在本地运行良好,但在我们部署到 Kinesis Data Analytics (KDA) 之后,它不会将记录发送到接收器中。
使用的技术
本地
- 来源:卡夫卡2.7
- 1 位经纪人
- 1 个分区为 1 且复制因子为 1 的主题
- 正在处理:Flink 1.12.1
- 接收器:Managed ElasticSearch Service 7.9.1(与 AWS 相同的实例)
AWS
- 来源:亚马逊 MSK Kafka 2.8
- 3 个经纪人(但我们正在连接一个)
- 1 个主题,分区为 1,复制因子 3
- 正在处理:Amazon KDA Flink 1.11.1
- 平行度:2
- 每个 KPU 的并行度:2
- 接收器:托管 ElasticSearch 服务 7.9.1
应用程序逻辑
FlinkKafkaConsumer
从主题 中读取 json 格式的消息
- json 映射到域对象,称为
Telemetry
private static DataStream<Telemetry> SetupKafkaSource(StreamExecutionEnvironment environment){
Properties kafkaProperties = new Properties();
kafkaProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "BROKER1_ADDRESS.amazonaws.com:9092");
kafkaProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink_consumer");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("THE_TOPIC", new SimpleStringSchema(), kafkaProperties);
consumer.setStartFromEarliest(); //Just for repeatable testing
return environment
.addSource(consumer)
.map(new MapJsonToTelemetry());
}
- 为 EventTimeStamp 选择了遥测的时间戳。
3.1.随着forMonotonousTimeStamps
- Telemetry 的
StateIso
用于keyBy
。
4.1.美国的两个字母iso代码 - 5 秒翻滚window 策略已应用
private static SingleOutputStreamOperator<StateAggregatedTelemetry> SetupProcessing(DataStream<Telemetry> telemetries) {
WatermarkStrategy<Telemetry> wmStrategy =
WatermarkStrategy
.<Telemetry>forMonotonousTimestamps()
.withTimestampAssigner((event, timestamp) -> event.TimeStamp);
return telemetries
.assignTimestampsAndWatermarks(wmStrategy)
.keyBy(t -> t.StateIso)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.process(new WindowCountFunction());
}
- 调用自定义
ProcessWindowFunction
来执行一些基本聚合。
6.1.我们计算单个StateAggregatedTelemetry
- ElasticSearch 配置为接收器。
7.1.StateAggregatedTelemetry
数据映射到HashMap
并推入source
.
7.2.所有setBulkFlushXYZ
方法都设置为低值
private static void SetupElasticSearchSink(SingleOutputStreamOperator<StateAggregatedTelemetry> telemetries) {
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(HttpHost.create("https://ELKCLUSTER_ADDRESS.amazonaws.com:443"));
ElasticsearchSink.Builder<StateAggregatedTelemetry> esSinkBuilder = new ElasticsearchSink.Builder<>(
httpHosts,
(ElasticsearchSinkFunction<StateAggregatedTelemetry>) (element, ctx, indexer) -> {
Map<String, Object> record = new HashMap<>();
record.put("stateIso", element.StateIso);
record.put("healthy", element.Flawless);
record.put("unhealthy", element.Faulty);
...
LOG.info("Telemetry has been added to the buffer");
indexer.add(Requests.indexRequest()
.index("INDEXPREFIX-"+ from.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")))
.source(record, XContentType.JSON));
}
);
//Using low values to make sure that the Flush will happen
esSinkBuilder.setBulkFlushMaxActions(25);
esSinkBuilder.setBulkFlushInterval(1000);
esSinkBuilder.setBulkFlushMaxSizeMb(1);
esSinkBuilder.setBulkFlushBackoff(true);
esSinkBuilder.setRestClientFactory(restClientBuilder -> {});
LOG.info("Sink has been attached to the DataStream");
telemetries.addSink(esSinkBuilder.build());
}
排除的东西
- 我们设法将 Kafka、KDA 和 ElasticSearch 置于相同的 VPC 和相同的子网下,以避免需要对每个请求进行签名
- 从日志中可以看出Flink可以到达ES集群
要求
{
"locationInformation": "org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge.verifyClientConnection(Elasticsearch7ApiCallBridge.java:135)",
"logger": "org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge",
"message": "Pinging Elasticsearch cluster via hosts [https://...es.amazonaws.com:443] ...",
"threadName": "Window(TumblingEventTimeWindows(5000), EventTimeTrigger, WindowCountFunction) -> (Sink: Print to Std. Out, Sink: Unnamed, Sink: Print to Std. Out) (2/2)",
"applicationARN": "arn:aws:kinesisanalytics:...",
"applicationVersionId": "39",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
回应
{
"locationInformation": "org.elasticsearch.client.RequestLogger.logResponse(RequestLogger.java:59)",
"logger": "org.elasticsearch.client.RestClient",
"message": "request [HEAD https://...es.amazonaws.com:443/] returned [HTTP/1.1 200 OK]",
"threadName": "Window(TumblingEventTimeWindows(5000), EventTimeTrigger, WindowCountFunction) -> (Sink: Print to Std. Out, Sink: Unnamed, Sink: Print to Std. Out) (2/2)",
"applicationARN": "arn:aws:kinesisanalytics:...",
"applicationVersionId": "39",
"messageSchemaVersion": "1",
"messageType": "DEBUG"
}
- 我们还可以通过查看 Flink 仪表板来验证消息是否已从 Kafka 主题读取并发送以供处理
我们已经尝试过但没有运气的东西
- 我们实现了一个
RichParallelSourceFunction
发出 1_000_000 消息然后退出- 这在本地环境中运行良好
- 作业在AWS环境完成,但sink端没有数据
- 我们实现了另一个
RichParallelSourceFunction
,它每秒发出 100 条消息- 基本上我们有两个循环,一个
while(true)
外循环和for
内循环 - 在内部循环之后我们调用了
Thread.sleep(1000)
- 这在本地环境下运行得非常好
- 但在 AWS 中我们可以看到检查点的大小不断增长,并且在 ELK 中没有出现任何消息
- 基本上我们有两个循环,一个
- 我们已经尝试 运行 具有不同并行设置的 KDA 应用程序
- 但没有区别
- 我们还尝试了不同的水印策略(
forBoundedOutOfOrderness
、withIdle
、noWatermarks
)- 但没有区别
- 我们为
ProcessWindowFunction
和ElasticsearchSinkFunction
添加了日志- 每当我们 运行 来自 IDEA 的应用程序时,这些日志就会出现在控制台上
- 每当我们 运行 带有 KDA 的应用程序时,CloudWatch 中就没有这样的日志
- 那些添加到
main
的日志确实出现在 CloudWatch 日志中
- 那些添加到
我们假设我们在接收器端看不到数据,因为 window 处理逻辑没有被触发。这就是为什么在 CloudWatch 中看不到处理日志的原因。
我们非常欢迎任何帮助!
更新#1
- 我们已经尝试将 Flink 版本从 1.12.1 降级到 1.11.1
- 没有变化
- 我们已尝试处理时间 window 而不是事件时间
- 它甚至在本地环境中都不起作用
更新#2
邮件的平均大小约为 4kb。以下是示例消息的摘录:
{
"affiliateCode": "...",
"appVersion": "1.1.14229",
"clientId": "guid",
"clientIpAddr": "...",
"clientOriginated": true,
"connectionType": "Cable/DSL",
"countryCode": "US",
"design": "...",
"device": "...",
...
"deviceSerialNumber": "...",
"dma": "UNKNOWN",
"eventSource": "...",
"firstRunTimestamp": 1609091112818,
"friendlyDeviceName": "Comcast",
"fullDevice": "Comcast ...",
"geoInfo": {
"continent": {
"code": "NA",
"geoname_id": 120
},
"country": {
"geoname_id": 123,
"iso_code": "US"
},
"location": {
"accuracy_radius": 100,
"latitude": 37.751,
"longitude": -97.822,
"time_zone": "America/Chicago"
},
"registered_country": {
"geoname_id": 123,
"iso_code": "US"
}
},
"height": 720,
"httpUserAgent": "Mozilla/...",
"isLoggedIn": true,
"launchCount": 19,
"model": "...",
"os": "Comcast...",
"osVersion": "...",
...
"platformTenantCode": "...",
"productCode": "...",
"requestOrigin": "https://....com",
"serverTimeUtc": 1617809474787,
"serviceCode": "...",
"serviceOriginated": false,
"sessionId": "guid",
"sessionSequence": 2,
"subtype": "...",
"tEventId": "...",
...
"tRegion": "us-east-1",
"timeZoneOffset": 5,
"timestamp": 1617809473305,
"traits": {
"isp": "Comcast Cable",
"organization": "..."
},
"type": "...",
"userId": "guid",
"version": "v1",
"width": 1280,
"xb3traceId": "guid"
}
我们使用 ObjectMapper
仅解析 json 的部分字段。这是 Telemetry
class 的样子:
public class Telemetry {
public String AppVersion;
public String CountryCode;
public String ClientId;
public String DeviceSerialNumber;
public String EventSource;
public String SessionId;
public TelemetrySubTypes SubType; //enum
public String TRegion;
public Long TimeStamp;
public TelemetryTypes Type; //enum
public String StateIso;
...
}
更新 #3
来源
子任务选项卡
ID | Bytes received | Records received | Bytes sent | Records sent | Status |
---|---|---|---|---|---|
0 | 0 B | 0 | 0 B | 0 | RUNNING |
1 | 0 B | 0 | 2.83 MB | 15,000 | RUNNING |
水印选项卡
无数据
Window
子任务选项卡
ID | Bytes received | Records received | Bytes sent | Records sent | Status |
---|---|---|---|---|---|
0 | 1.80 MB | 9,501 | 0 B | 0 | RUNNING |
1 | 1.04 MB | 5,499 | 0 B | 0 | RUNNING |
水印
SubTask | Watermark |
---|---|
1 | No Watermark |
2 | No Watermark |
根据您提供的评论和更多信息,问题似乎是两个 Flink 消费者无法从同一分区消费。因此,在您的情况下,只有一个运算符的并行实例将从 kafka 分区中消耗,而另一个将处于空闲状态。
一般来说,Flink 运算符会 select MIN([all_downstream_parallel_watermarks])
,所以在你的情况下,一个 Kafka 消费者会产生正常的 Watermarks 而另一个永远不会产生任何东西(在这种情况下,flink 假设 Long.Min
),因此 Flink 将 select 较低的 Long.Min
。因此,window 将永远不会被触发,因为当数据在流动时,永远不会生成一个水印。好的做法是在使用 Kafka 时使用与 Kafka 分区数相同的并行性。
在获得 AWS 人员的支持后 session 发现我们未能在流媒体环境中设置时间特征。
- 在 1.11.1 中,
TimeCharacteristic
的默认值为IngestionTime
。 - 从 1.12.1 开始(参见相关 release notes)默认值为
EventTime
:
In Flink 1.12 the default stream time characteristic has been changed to EventTime, thus you don’t need to call this method for enabling event-time support anymore.
因此,在我们明确设置 EventTime
之后,它开始像魅力一样生成水印:
streamingEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);