在 Flink 中使用计数器获取 numOfRecordsIn
getting numOfRecordsIn using counters in Flink
我想在 Flink 中为操作员显示 numRecordsIn,为此我一直在关注 here 上数据工匠的 ppt。计数器代码如下
public static class mapper extends RichMapFunction<String,String>{
public Counter counter;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.counter = getRuntimeContext()
.getMetricGroup()
.counter("numRecordsIn");
}
@Override
public String map(String s) throws Exception {
counter.inc();
System.out.println("counter val " + counter.toString());
return null;
}
}
问题是如何指定要显示的运算符number_of_Records_In?
指标计数器通过 Flink 的指标系统公开。为了查看它们,您必须配置一个指标报告程序。可以找到有关如何注册指标报告程序的说明 here。
Flink 包含许多 built-in 指标,包括 numRecordsIn。因此,如果这就是您要测量的内容,则无需编写任何代码来实现该特定测量。对于 numRecordsInPerSecond 和许多其他人也是如此。
您询问的代码导致 numRecordsIn 计数器针对正在使用指标的运算符递增。
更好地理解指标系统的一个好方法是提出一个简单的流作业并查看 Flink 网络中的指标 ui。我还发现在作业 运行.
时查询监视 REST api 真的很有帮助
我想在 Flink 中为操作员显示 numRecordsIn,为此我一直在关注 here 上数据工匠的 ppt。计数器代码如下
public static class mapper extends RichMapFunction<String,String>{
public Counter counter;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.counter = getRuntimeContext()
.getMetricGroup()
.counter("numRecordsIn");
}
@Override
public String map(String s) throws Exception {
counter.inc();
System.out.println("counter val " + counter.toString());
return null;
}
}
问题是如何指定要显示的运算符number_of_Records_In?
指标计数器通过 Flink 的指标系统公开。为了查看它们,您必须配置一个指标报告程序。可以找到有关如何注册指标报告程序的说明 here。
Flink 包含许多 built-in 指标,包括 numRecordsIn。因此,如果这就是您要测量的内容,则无需编写任何代码来实现该特定测量。对于 numRecordsInPerSecond 和许多其他人也是如此。
您询问的代码导致 numRecordsIn 计数器针对正在使用指标的运算符递增。
更好地理解指标系统的一个好方法是提出一个简单的流作业并查看 Flink 网络中的指标 ui。我还发现在作业 运行.
时查询监视 REST api 真的很有帮助