如何使变量对 Apache Flink 中的所有 TaskManager 可用?
How to make a variable available to all of the TaskManagers in Apache Flink?
我需要在我的程序中设置值列表并在所有任务管理器中访问它们。目前,我在主 class 中声明了一个 public 字段并设置了值。稍后在我的程序中,在远程集群中将是 运行,我想在所有任务管理器中访问这个变量。
这是我的示例代码。但是似乎存在问题:没有任何编译或 运行 时错误,任务管理器无法使用这些值。
public class myMainClass {
public static ArrayList<String> mykey = new ArrayList<String>();
public static void main(String[] args) throws Exception {
// assign value to the variable
partitionedData = partitionedData.partitionCustom(new MyPartitioner(myKey), 2);
}
}
public static class MyPartitioner implements Partitioner<String> {
public String [] partitionKeys;
public static ArrayList<String> mykey;
public MyPartitioner(ArrayList<String> mykey) {
this.mykey = mykey;
}
@Override
public int partition(String key, int numPartitions) {
for (int i=0 ; i< numParalell-1 ; i++) {
if(mykey.get(i).compareToIgnoreCase(key) > 0)
return i;
}
return numParalell-1 ;
}
}
我会将 mykey
列表作为构造函数参数传递给 MyPartitioner
class。
您的代码如下所示:
public class myMainClass {
public static void main(String[] args) throws Exception {
ArrayList<String> mykey = new ArrayList<String>();
// assign value to the vaiable
partitionedData = partitionedData.partitionCustom(new MyPartitioner(mykey), 2);
}
}
public static class MyPartitioner implements Partitioner<String> {
private final ArrayList<String> mykey;
public String [] partitionKeys;
public MyPartitioner(ArrayList<String> mykey) {
this.mykey = mykey;
}
@Override
public int partition(String key, int numPartitions) {
for (int i=0 ; i< numParalell-1 ; i++) {
if(mykey.get(i).compareToIgnoreCase(key) > 0)
return i;
}
return numParalell-1 ;
}
}
我不确定你想要完成什么。如果您想 pre-compute 一个 (non-changing) 值并将其分发给所有任务管理器(我假设您需要在某些运算符中访问这些值),您可以通过构造函数参数将这些值简单地提供给您的 UDF或者使用 Flink 的广播变量:https://ci.apache.org/projects/flink/flink-docs-release-0.8/programming_guide.html#broadcast-variables
我需要在我的程序中设置值列表并在所有任务管理器中访问它们。目前,我在主 class 中声明了一个 public 字段并设置了值。稍后在我的程序中,在远程集群中将是 运行,我想在所有任务管理器中访问这个变量。 这是我的示例代码。但是似乎存在问题:没有任何编译或 运行 时错误,任务管理器无法使用这些值。
public class myMainClass {
public static ArrayList<String> mykey = new ArrayList<String>();
public static void main(String[] args) throws Exception {
// assign value to the variable
partitionedData = partitionedData.partitionCustom(new MyPartitioner(myKey), 2);
}
}
public static class MyPartitioner implements Partitioner<String> {
public String [] partitionKeys;
public static ArrayList<String> mykey;
public MyPartitioner(ArrayList<String> mykey) {
this.mykey = mykey;
}
@Override
public int partition(String key, int numPartitions) {
for (int i=0 ; i< numParalell-1 ; i++) {
if(mykey.get(i).compareToIgnoreCase(key) > 0)
return i;
}
return numParalell-1 ;
}
}
我会将 mykey
列表作为构造函数参数传递给 MyPartitioner
class。
您的代码如下所示:
public class myMainClass {
public static void main(String[] args) throws Exception {
ArrayList<String> mykey = new ArrayList<String>();
// assign value to the vaiable
partitionedData = partitionedData.partitionCustom(new MyPartitioner(mykey), 2);
}
}
public static class MyPartitioner implements Partitioner<String> {
private final ArrayList<String> mykey;
public String [] partitionKeys;
public MyPartitioner(ArrayList<String> mykey) {
this.mykey = mykey;
}
@Override
public int partition(String key, int numPartitions) {
for (int i=0 ; i< numParalell-1 ; i++) {
if(mykey.get(i).compareToIgnoreCase(key) > 0)
return i;
}
return numParalell-1 ;
}
}
我不确定你想要完成什么。如果您想 pre-compute 一个 (non-changing) 值并将其分发给所有任务管理器(我假设您需要在某些运算符中访问这些值),您可以通过构造函数参数将这些值简单地提供给您的 UDF或者使用 Flink 的广播变量:https://ci.apache.org/projects/flink/flink-docs-release-0.8/programming_guide.html#broadcast-variables