Apache Flink 的吞吐量和延迟
Throughput and Latency on Apache Flink
我已经为 Apache Fjava 编写了一个非常简单的 java 程序 link 现在我有兴趣测量统计数据,例如吞吐量(每秒处理的元组数)和延迟(时间程序需要处理每个输入元组)。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.readTextFile("/home/LizardKing/Documents/Power/Prova.csv")
.map(new MyMapper().writeAsCsv("/home/LizardKing/Results.csv");
JobExecutionResult res = env.execute();
我知道 Flink 公开了一些指标:
https://ci.apache.org/projects/flink/flink-docs-release-1.2/monitoring/metrics.html
但我不确定如何使用它们来获得我想要的东西。从 link 我读到 "meter" 可以用来衡量平均吞吐量但是,在定义它之后,我应该如何使用它?
我们在 yarn 上 运行 生产流作业 运行 中的自定义指标,如 meter、gauge。
步骤如下:
对 pom.xml
的额外依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-dropwizard</artifactId>
<version>${flink.version}</version>
</dependency>
我们使用的是 1.2.1 版本
然后将仪表添加到 MyMapper class .
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.dropwizard.metrics.DropwizardMeterWrapper;
import org.apache.flink.metrics.Meter;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Test {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env
.readTextFile("/home/LizardKing/Documents/Power/Prova.csv")
.map(new MyMapper())
.writeAsCsv("/home/LizardKing/Results.csv");
JobExecutionResult res = env.execute();
}
private static class MyMapper extends RichMapFunction<String, Object> {
private transient Meter meter;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.meter = getRuntimeContext()
.getMetricGroup()
.meter("myMeter", new DropwizardMeterWrapper(new com.codahale.metrics.Meter()));
}
@Override
public Object map(String value) throws Exception {
this.meter.markEvent();
return value;
}
}
}
希望对您有所帮助。
我已经为 Apache Fjava 编写了一个非常简单的 java 程序 link 现在我有兴趣测量统计数据,例如吞吐量(每秒处理的元组数)和延迟(时间程序需要处理每个输入元组)。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.readTextFile("/home/LizardKing/Documents/Power/Prova.csv")
.map(new MyMapper().writeAsCsv("/home/LizardKing/Results.csv");
JobExecutionResult res = env.execute();
我知道 Flink 公开了一些指标:
https://ci.apache.org/projects/flink/flink-docs-release-1.2/monitoring/metrics.html
但我不确定如何使用它们来获得我想要的东西。从 link 我读到 "meter" 可以用来衡量平均吞吐量但是,在定义它之后,我应该如何使用它?
我们在 yarn 上 运行 生产流作业 运行 中的自定义指标,如 meter、gauge。
步骤如下:
对 pom.xml
的额外依赖<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-dropwizard</artifactId>
<version>${flink.version}</version>
</dependency>
我们使用的是 1.2.1 版本
然后将仪表添加到 MyMapper class .
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.dropwizard.metrics.DropwizardMeterWrapper;
import org.apache.flink.metrics.Meter;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Test {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env
.readTextFile("/home/LizardKing/Documents/Power/Prova.csv")
.map(new MyMapper())
.writeAsCsv("/home/LizardKing/Results.csv");
JobExecutionResult res = env.execute();
}
private static class MyMapper extends RichMapFunction<String, Object> {
private transient Meter meter;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.meter = getRuntimeContext()
.getMetricGroup()
.meter("myMeter", new DropwizardMeterWrapper(new com.codahale.metrics.Meter()));
}
@Override
public Object map(String value) throws Exception {
this.meter.markEvent();
return value;
}
}
}
希望对您有所帮助。