使用 Java 使用 Apache Flink 减少 Pojo 字段
Reduce on Pojo field with Apache Flink using Java
我目前正在为一些分布式处理工具构建一个基准测试工具,并且在使用 Apache Flink 时遇到了一些问题。
设置很简单:LogPojo 是一个简单的 Pojo,具有三个字段(长日期、双精度值、字符串数据)。在列表中,我正在寻找具有最小 "value" 字段的一个 LogPojo。基本上相当于:
pojoList.stream().min(new LogPojo.Comp()).get().getValue();
我的 flink 设置如下:
public double processLogs(List<LogPojo> logs) {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<LogPojo> logSet = env.fromCollection(logs);
double result = 0.0;
try {
ReduceOperator ro = logSet.reduce(new LogReducer());
List<LogPojo> c = ro.collect();
result = c.get(0).getValue();
} catch (Exception ex) {
System.out.println("Exception caught" + ex);
}
return result;
}
public class LogReducer implements ReduceFunction<LogPojo> {
@Override
public LogPojo reduce(LogPojo o1, LogPojo o2) {
return (o1.getValue() < o2.getValue()) ? o1 : o2;
}
}
它停止于:
Exception in thread "main" java.lang.NoSuchMethodError: scala.collection.immutable.HashSet$.empty()Lscala/collection/immutable/HashSet;
所以不知何故它似乎无法应用reduce功能。我就是找不到,为什么。有什么提示吗?
首先你应该检查你的导入。您从 Scala class 中得到一个异常,但您的程序是在 Java 中实现的。您可能不小心导入了 Scala 数据集 API。使用 Java API 不应导致 Scala 异常(除非您使用依赖于 Scala 的 classes)。
尽管如此,Flink 内置了min
、max
等聚合方法
DataSet<LogPojo> logSet = env.fromCollection(logs);
// map LogPojo to a Tuple1<Double>
// (Flink's built-in aggregation functions work only on Tuple types)
DataSet<Tuple1<Double>> values = logSet.map(new MapFunction<LogPojo, Tuple1<Double>>() {
@Override
public Tuple1<Double> map(LogPojo l) throws Exception {
return new Tuple1<>(l.value);
}
});
// fetch the min value (at position 0 in the Tuple)
List<Tuple1<Double>> c = values.min(0).collect();
// get the first field of the Tuple
Double minVal = c.get(0).f0;
我目前正在为一些分布式处理工具构建一个基准测试工具,并且在使用 Apache Flink 时遇到了一些问题。
设置很简单:LogPojo 是一个简单的 Pojo,具有三个字段(长日期、双精度值、字符串数据)。在列表中,我正在寻找具有最小 "value" 字段的一个 LogPojo。基本上相当于:
pojoList.stream().min(new LogPojo.Comp()).get().getValue();
我的 flink 设置如下:
public double processLogs(List<LogPojo> logs) {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<LogPojo> logSet = env.fromCollection(logs);
double result = 0.0;
try {
ReduceOperator ro = logSet.reduce(new LogReducer());
List<LogPojo> c = ro.collect();
result = c.get(0).getValue();
} catch (Exception ex) {
System.out.println("Exception caught" + ex);
}
return result;
}
public class LogReducer implements ReduceFunction<LogPojo> {
@Override
public LogPojo reduce(LogPojo o1, LogPojo o2) {
return (o1.getValue() < o2.getValue()) ? o1 : o2;
}
}
它停止于:
Exception in thread "main" java.lang.NoSuchMethodError: scala.collection.immutable.HashSet$.empty()Lscala/collection/immutable/HashSet;
所以不知何故它似乎无法应用reduce功能。我就是找不到,为什么。有什么提示吗?
首先你应该检查你的导入。您从 Scala class 中得到一个异常,但您的程序是在 Java 中实现的。您可能不小心导入了 Scala 数据集 API。使用 Java API 不应导致 Scala 异常(除非您使用依赖于 Scala 的 classes)。
尽管如此,Flink 内置了min
、max
等聚合方法
DataSet<LogPojo> logSet = env.fromCollection(logs);
// map LogPojo to a Tuple1<Double>
// (Flink's built-in aggregation functions work only on Tuple types)
DataSet<Tuple1<Double>> values = logSet.map(new MapFunction<LogPojo, Tuple1<Double>>() {
@Override
public Tuple1<Double> map(LogPojo l) throws Exception {
return new Tuple1<>(l.value);
}
});
// fetch the min value (at position 0 in the Tuple)
List<Tuple1<Double>> c = values.min(0).collect();
// get the first field of the Tuple
Double minVal = c.get(0).f0;