Kafka 和 Flink 环境下如何测试性能?
How performance can be tested in Kafka and Flink environment?
以kafka为输入源的Flink如何进行性能测试。另外,如果有任何性能测试工具可用于这种情况,请推荐。
Flink 包括吞吐量(numRecordsInPerSecond 和 numRecordsOutPerSecond)和 latency 的指标。
如果您想更仔细地测量端到端延迟,您可以在接收器(或其他终端节点)中添加自定义指标,将事件中的时间戳与当前时间进行比较。那看起来像这样:
public class LatencyMeasuringSink<T> extends RichSinkFunction<T> {
private transient DescriptiveStatisticsHistogram eventTimeLag;
private static final int EVENT_TIME_LAG_WINDOW_SIZE = 10_000;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
eventTimeLag = getRuntimeContext().getMetricGroup().histogram("eventTimeLag",
new DescriptiveStatisticsHistogram(EVENT_TIME_LAG_WINDOW_SIZE));
}
@Override
public void invoke(T dataPoint, Context context) throws Exception {
eventTimeLag.update(System.currentTimeMillis() - dataPoint.getTimeStampMs());
}
}
您可能想要配置您的 Kafka 生产者以在您的事件中放置 LogAppendTime
时间戳,并使用它们作为比较的基础。当然,这是假设所涉及的不同机器中的时钟同步得足够好,以使该测量有意义——或者您可以 运行 在一台机器上进行测试。
FLIP-83: Flink End-to-end Performance Testing Framework 可能也很有趣。这项工作正在进行中。
以kafka为输入源的Flink如何进行性能测试。另外,如果有任何性能测试工具可用于这种情况,请推荐。
Flink 包括吞吐量(numRecordsInPerSecond 和 numRecordsOutPerSecond)和 latency 的指标。
如果您想更仔细地测量端到端延迟,您可以在接收器(或其他终端节点)中添加自定义指标,将事件中的时间戳与当前时间进行比较。那看起来像这样:
public class LatencyMeasuringSink<T> extends RichSinkFunction<T> {
private transient DescriptiveStatisticsHistogram eventTimeLag;
private static final int EVENT_TIME_LAG_WINDOW_SIZE = 10_000;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
eventTimeLag = getRuntimeContext().getMetricGroup().histogram("eventTimeLag",
new DescriptiveStatisticsHistogram(EVENT_TIME_LAG_WINDOW_SIZE));
}
@Override
public void invoke(T dataPoint, Context context) throws Exception {
eventTimeLag.update(System.currentTimeMillis() - dataPoint.getTimeStampMs());
}
}
您可能想要配置您的 Kafka 生产者以在您的事件中放置 LogAppendTime
时间戳,并使用它们作为比较的基础。当然,这是假设所涉及的不同机器中的时钟同步得足够好,以使该测量有意义——或者您可以 运行 在一台机器上进行测试。
FLIP-83: Flink End-to-end Performance Testing Framework 可能也很有趣。这项工作正在进行中。