Apache Flink:在远程集群上执行扩展 RichFlatMapFunction 的程序会导致错误
Apache Flink: executing a program which extends the RichFlatMapFunction on the remote cluster causes error
我在 Apache Flink 中有以下代码。它在本地集群中工作正常,而 运行 它在远程集群上在包含命令 "stack.push(recordPair);".
的行中生成 NullPointerException 错误
有谁知道,是什么原因?
本地和远程集群的输入数据集相同。
public static class TC extends RichFlatMapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
private static TreeSet<Tuple2<Integer, Integer>> treeSet_duplicate_pair ;
private static HashMap< Integer, Set<Integer>> clusters_duplicate_map ;
private static Stack<Tuple2< Integer,Integer>> stack ;
public TC(List<Tuple2<Integer, Integer>> duplicatsPairs) {
...
stack = new Stack<Tuple2< Integer,Integer>>();
}
@Override
public void flatMap(Tuple2<Integer, Integer> recordPair, Collector<Tuple2<Integer, Integer>> out) throws Exception {
if (recordPair!= null)
{
stack.push(recordPair);
...
}
}
问题是您在 TC
class 的构造函数中初始化了 stack
变量。这仅为客户端程序 运行s 所在的 JVM 初始化静态变量。对于本地执行,这是可行的,因为 Flink 作业是在同一个 JVM 中执行的。
当您 运行 它在集群上时,您的 TC
将被序列化并传送到集群节点。还有实例的反序列化不会再次调用构造函数来初始化stack
。为了使其工作,您应该将初始化逻辑移至 RichFlatMapFunction
的 open
方法或使用静态初始化器。但请注意,所有在同一个 TaskManager
上 运行 的运算符将共享 stack
的相同实例,因为它是一个 class 变量。
public static class TC extends RichFlatMapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
private static TreeSet<Tuple2<Integer, Integer>> treeSet_duplicate_pair;
private static HashMap< Integer, Set<Integer>> clusters_duplicate_map;
// either use a static initializer
private static Stack<Tuple2< Integer,Integer>> stack = new Stack<Tuple2< Integer,Integer>>();
public TC(List<Tuple2<Integer, Integer>> duplicatsPairs) {
...
}
@Override
public void open(Configuration config) {
// or initialize stack here, but here you have to synchronize the initialization
...
}
@Override
public void flatMap(Tuple2<Integer, Integer> recordPair, Collector<Tuple2<Integer, Integer>> out) throws Exception {
if (recordPair!= null)
{
stack.push(recordPair);
...
}
}
}
我在 Apache Flink 中有以下代码。它在本地集群中工作正常,而 运行 它在远程集群上在包含命令 "stack.push(recordPair);".
的行中生成 NullPointerException 错误有谁知道,是什么原因?
本地和远程集群的输入数据集相同。
public static class TC extends RichFlatMapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
private static TreeSet<Tuple2<Integer, Integer>> treeSet_duplicate_pair ;
private static HashMap< Integer, Set<Integer>> clusters_duplicate_map ;
private static Stack<Tuple2< Integer,Integer>> stack ;
public TC(List<Tuple2<Integer, Integer>> duplicatsPairs) {
...
stack = new Stack<Tuple2< Integer,Integer>>();
}
@Override
public void flatMap(Tuple2<Integer, Integer> recordPair, Collector<Tuple2<Integer, Integer>> out) throws Exception {
if (recordPair!= null)
{
stack.push(recordPair);
...
}
}
问题是您在 TC
class 的构造函数中初始化了 stack
变量。这仅为客户端程序 运行s 所在的 JVM 初始化静态变量。对于本地执行,这是可行的,因为 Flink 作业是在同一个 JVM 中执行的。
当您 运行 它在集群上时,您的 TC
将被序列化并传送到集群节点。还有实例的反序列化不会再次调用构造函数来初始化stack
。为了使其工作,您应该将初始化逻辑移至 RichFlatMapFunction
的 open
方法或使用静态初始化器。但请注意,所有在同一个 TaskManager
上 运行 的运算符将共享 stack
的相同实例,因为它是一个 class 变量。
public static class TC extends RichFlatMapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
private static TreeSet<Tuple2<Integer, Integer>> treeSet_duplicate_pair;
private static HashMap< Integer, Set<Integer>> clusters_duplicate_map;
// either use a static initializer
private static Stack<Tuple2< Integer,Integer>> stack = new Stack<Tuple2< Integer,Integer>>();
public TC(List<Tuple2<Integer, Integer>> duplicatsPairs) {
...
}
@Override
public void open(Configuration config) {
// or initialize stack here, but here you have to synchronize the initialization
...
}
@Override
public void flatMap(Tuple2<Integer, Integer> recordPair, Collector<Tuple2<Integer, Integer>> out) throws Exception {
if (recordPair!= null)
{
stack.push(recordPair);
...
}
}
}