使用 Flink CEP 测量事件时间延迟
Measuring event-time latency with Flink CEP
我已经用 Flink CEP 实现了一个匹配三个事件的模式,例如 A->B->C
。在我定义了我的模式之后,我生成了一个
PatternStream<Event> patternStream = CEP.pattern(eventStream, pattern);
PatternSelectFunction
使得
patternStream.select(new MyPatternSelectFunction()).print();
这很有用,但我对所有匹配事件的事件时间感兴趣。我知道传统的 Flink streaming API 提供了丰富的功能,允许您按照 中所述注册 Flink 的内部延迟跟踪器。我还看到 Flink 1.8 添加了一个新的 RichPatternSelectFunction
。但不幸的是,我无法使用 Flink CEP 设置 Flink 1.8。
最后,有没有办法获取所有匹配事件的事件时间?
您不需要 Rich Functions 就可以使用 Flink 的延迟跟踪。您只需要通过在 Flink 配置或 ExecutionConfig 中将 latencyTrackingInterval
设置为正数来启用它,例如
env.getConfig().setLatencyTrackingInterval(1000);
然后您可以在指标解决方案中或通过 REST api 观察结果(Flink 网络中未报告延迟指标 UI)。
更新:
延迟统计数据是作业指标,在
返回的列表中
http://<job_manager_rest_endpoint>/jobs/<job_id>/metrics
可以从
获取延迟指标值
http://<job_manager_rest_endpoint>/jobs/<job_id>/metrics?get=<metric_name>
这些指标的名称类似于
latency.source_id.<ID>.operator_id.<ID>.operator_subtask_index.<SUBTASK>.<metric>
其中 ID 标识作业图中的源节点和运算符节点,在这些节点之间测量延迟。
例如,我可以通过此请求确定我现在 运行 的作业中源和接收器之一之间的第 95 个百分位延迟:
http://localhost:8081/jobs/94b189a96b98b3aafaba6db6aa8b770b/metrics?get=latency.source_id.bc764cd8ddf7a0cff126f51c16239658.operator_id.fd0ee602f2fa8d310d9bd9f694e185f5.operator_subtask_index.0.latency_p95
或者,您可以使用 ProcessFunction 在事件进入作业的 CEP 部分之前将处理时间时间戳添加到您的事件中,然后使用另一个 ProcessFunction 来测量经过的时间。
我已经用 Flink CEP 实现了一个匹配三个事件的模式,例如 A->B->C
。在我定义了我的模式之后,我生成了一个
PatternStream<Event> patternStream = CEP.pattern(eventStream, pattern);
PatternSelectFunction
使得
patternStream.select(new MyPatternSelectFunction()).print();
这很有用,但我对所有匹配事件的事件时间感兴趣。我知道传统的 Flink streaming API 提供了丰富的功能,允许您按照 RichPatternSelectFunction
。但不幸的是,我无法使用 Flink CEP 设置 Flink 1.8。
最后,有没有办法获取所有匹配事件的事件时间?
您不需要 Rich Functions 就可以使用 Flink 的延迟跟踪。您只需要通过在 Flink 配置或 ExecutionConfig 中将 latencyTrackingInterval
设置为正数来启用它,例如
env.getConfig().setLatencyTrackingInterval(1000);
然后您可以在指标解决方案中或通过 REST api 观察结果(Flink 网络中未报告延迟指标 UI)。
更新:
延迟统计数据是作业指标,在
返回的列表中http://<job_manager_rest_endpoint>/jobs/<job_id>/metrics
可以从
获取延迟指标值http://<job_manager_rest_endpoint>/jobs/<job_id>/metrics?get=<metric_name>
这些指标的名称类似于
latency.source_id.<ID>.operator_id.<ID>.operator_subtask_index.<SUBTASK>.<metric>
其中 ID 标识作业图中的源节点和运算符节点,在这些节点之间测量延迟。
例如,我可以通过此请求确定我现在 运行 的作业中源和接收器之一之间的第 95 个百分位延迟:
http://localhost:8081/jobs/94b189a96b98b3aafaba6db6aa8b770b/metrics?get=latency.source_id.bc764cd8ddf7a0cff126f51c16239658.operator_id.fd0ee602f2fa8d310d9bd9f694e185f5.operator_subtask_index.0.latency_p95
或者,您可以使用 ProcessFunction 在事件进入作业的 CEP 部分之前将处理时间时间戳添加到您的事件中,然后使用另一个 ProcessFunction 来测量经过的时间。