在一个 flink 作业中使用 collect() 和 env.execute()
use collect() and env.execute() in one flink job
我正在尝试在 Flink 中编写一个需要两个阶段的计算。
在第一阶段,我创建一个图并获取其顶点 ID:
List<String> ids = graph.getVertexIds().collect();
在第二阶段,我想将这些 id 用于每个顶点的 运行 SingleSourceShortestPath。
for (String id: ids){
System.out.println("Source Id: "+id);
graph.run( new SingleSourceShortestPaths<String, String>(id, 10)).print();
}
它在本地工作(在 IntelliJ IDE 和使用 ./bin/flink run ...
的命令行中),但是当我使用它的 WebUI 在 Flink 上提交作业时,程序只执行直到 collect()
方法并且不 运行 程序的剩余部分(对于语句和 print()
)。
有什么问题?
这是我的代码:
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.library.SingleSourceShortestPaths;
import java.util.ArrayList;
import java.util.List;
public class Main {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Edge<String, Double> e1 = new Edge<String, Double>("1", "2", 0.5);
Edge<String, Double> e2 = new Edge<String, Double>("2", "3", 0.5);
Edge<String, Double> e3 = new Edge<String, Double>("4", "5", 0.5);
Edge<String, Double> e4 = new Edge<String, Double>("5", "6", 0.5);
Edge<String, Double> e5 = new Edge<String, Double>("7", "8", 0.5);
List<Edge<String, Double>> edgeList = new ArrayList<Edge<String, Double>>();
edgeList.add(e1);
edgeList.add(e2);
edgeList.add(e3);
edgeList.add(e4);
edgeList.add(e5);
Graph<String, String, Double> graph = Graph.fromCollection(edgeList,
new MapFunction<String, String>() {
public String map(String value) {
return value;
}
}, env);
List<String> ids = graph.getVertexIds().collect();
for (String id: ids){
System.out.println("Source Id: "+id);
graph.run( new SingleSourceShortestPaths<String, String>(id, 10)).print();
}
}
}
基于此 link,Flink 转换是惰性的,这意味着它们不会执行,直到 sink 操作已调用。
Flink中的一个sink操作触发一个stream的执行产生程序想要的结果,比如保存结果到文件系统或打印到标准输出
Dataset.collect()
、Dataset.Count()
和 Dataset.print()
等方法是触发实际数据转换的接收器操作。
我正在尝试在 Flink 中编写一个需要两个阶段的计算。
在第一阶段,我创建一个图并获取其顶点 ID:
List<String> ids = graph.getVertexIds().collect();
在第二阶段,我想将这些 id 用于每个顶点的 运行 SingleSourceShortestPath。
for (String id: ids){
System.out.println("Source Id: "+id);
graph.run( new SingleSourceShortestPaths<String, String>(id, 10)).print();
}
它在本地工作(在 IntelliJ IDE 和使用 ./bin/flink run ...
的命令行中),但是当我使用它的 WebUI 在 Flink 上提交作业时,程序只执行直到 collect()
方法并且不 运行 程序的剩余部分(对于语句和 print()
)。
有什么问题?
这是我的代码:
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.library.SingleSourceShortestPaths;
import java.util.ArrayList;
import java.util.List;
public class Main {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Edge<String, Double> e1 = new Edge<String, Double>("1", "2", 0.5);
Edge<String, Double> e2 = new Edge<String, Double>("2", "3", 0.5);
Edge<String, Double> e3 = new Edge<String, Double>("4", "5", 0.5);
Edge<String, Double> e4 = new Edge<String, Double>("5", "6", 0.5);
Edge<String, Double> e5 = new Edge<String, Double>("7", "8", 0.5);
List<Edge<String, Double>> edgeList = new ArrayList<Edge<String, Double>>();
edgeList.add(e1);
edgeList.add(e2);
edgeList.add(e3);
edgeList.add(e4);
edgeList.add(e5);
Graph<String, String, Double> graph = Graph.fromCollection(edgeList,
new MapFunction<String, String>() {
public String map(String value) {
return value;
}
}, env);
List<String> ids = graph.getVertexIds().collect();
for (String id: ids){
System.out.println("Source Id: "+id);
graph.run( new SingleSourceShortestPaths<String, String>(id, 10)).print();
}
}
}
基于此 link,Flink 转换是惰性的,这意味着它们不会执行,直到 sink 操作已调用。
Flink中的一个sink操作触发一个stream的执行产生程序想要的结果,比如保存结果到文件系统或打印到标准输出
Dataset.collect()
、Dataset.Count()
和 Dataset.print()
等方法是触发实际数据转换的接收器操作。