在第二次计算中重用第一次计算的结果
Reuse results of first computation in second computation
我正在尝试在 Flink 中编写一个需要两个阶段的计算。
在第一阶段,我从一个文本文件开始,执行一些参数估计,结果获得一个 Java 表示数据统计模型的对象。
在第二阶段,我想用这个对象来生成模拟数据。
我不确定该怎么做。我尝试使用 LocalCollectionOutputFormat
,它在本地工作,但是当我在集群上部署作业时,我得到了 NullPointerException
- 这并不奇怪。
Flink 的做法是什么?
这是我的代码:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
GlobalConfiguration.includeConfiguration(configuration);
// Phase 1: read file and estimate model
DataSource<Tuple4<String, String, String, String>> source = env
.readCsvFile(args[0])
.types(String.class, String.class, String.class, String.class);
List<Tuple4<Bayes, Bayes, Bayes, Bayes>> bayesResult = new ArrayList<>();
// Processing here...
....output(new LocalCollectionOutputFormat<>(bayesResult));
env.execute("Bayes");
DataSet<BTP> btp = env
.createInput(new BayesInputFormat(bayesResult.get(0)))
// Phase 2: BayesInputFormat generates data for further calculations
// ....
这是我得到的异常:
Error: The program execution failed: java.lang.NullPointerException
at org.apache.flink.api.java.io.LocalCollectionOutputFormat.close(LocalCollectionOutputFormat.java:86)
at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:176)
at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:257)
at java.lang.Thread.run(Thread.java:745)
org.apache.flink.client.program.ProgramInvocationException: The program execution failed: java.lang.NullPointerException
at org.apache.flink.api.java.io.LocalCollectionOutputFormat.close(LocalCollectionOutputFormat.java:86)
at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:176)
at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:257)
at java.lang.Thread.run(Thread.java:745)
at org.apache.flink.client.program.Client.run(Client.java:328)
at org.apache.flink.client.program.Client.run(Client.java:294)
at org.apache.flink.client.program.Client.run(Client.java:288)
at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:55)
at it.list.flink.test.Test01.main(Test01.java:62)
...
在最新版本 (0.9-milestone-1) 中,一个 collect()
方法被添加到 Flink
public List<T> collect()
将 DataSet<T>
作为 List<T>
提取到驱动程序。 collect()
也会触发程序立即执行(不需要调用 ExecutionEnvironment.execute()
)。目前,大约 10 MB 的数据集有大小限制。
如果您不评估驱动程序中的模型,您也可以将两个程序链接在一起,并通过附加数据接收器将模型发送到一边。这会更有效率,因为数据不会在客户端机器上往返。
如果您使用的是 0.9 之前的 Flink,您可以使用以下代码片段将数据集收集到本地集合:
val dataJavaList = new ArrayList[K]
val outputFormat = new LocalCollectionOutputFormat[K](dataJavaList)
dataset.output(outputFormat)
env.execute("collect()")
其中 K
是您要收集的对象类型
我正在尝试在 Flink 中编写一个需要两个阶段的计算。
在第一阶段,我从一个文本文件开始,执行一些参数估计,结果获得一个 Java 表示数据统计模型的对象。
在第二阶段,我想用这个对象来生成模拟数据。
我不确定该怎么做。我尝试使用 LocalCollectionOutputFormat
,它在本地工作,但是当我在集群上部署作业时,我得到了 NullPointerException
- 这并不奇怪。
Flink 的做法是什么?
这是我的代码:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
GlobalConfiguration.includeConfiguration(configuration);
// Phase 1: read file and estimate model
DataSource<Tuple4<String, String, String, String>> source = env
.readCsvFile(args[0])
.types(String.class, String.class, String.class, String.class);
List<Tuple4<Bayes, Bayes, Bayes, Bayes>> bayesResult = new ArrayList<>();
// Processing here...
....output(new LocalCollectionOutputFormat<>(bayesResult));
env.execute("Bayes");
DataSet<BTP> btp = env
.createInput(new BayesInputFormat(bayesResult.get(0)))
// Phase 2: BayesInputFormat generates data for further calculations
// ....
这是我得到的异常:
Error: The program execution failed: java.lang.NullPointerException
at org.apache.flink.api.java.io.LocalCollectionOutputFormat.close(LocalCollectionOutputFormat.java:86)
at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:176)
at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:257)
at java.lang.Thread.run(Thread.java:745)
org.apache.flink.client.program.ProgramInvocationException: The program execution failed: java.lang.NullPointerException
at org.apache.flink.api.java.io.LocalCollectionOutputFormat.close(LocalCollectionOutputFormat.java:86)
at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:176)
at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:257)
at java.lang.Thread.run(Thread.java:745)
at org.apache.flink.client.program.Client.run(Client.java:328)
at org.apache.flink.client.program.Client.run(Client.java:294)
at org.apache.flink.client.program.Client.run(Client.java:288)
at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:55)
at it.list.flink.test.Test01.main(Test01.java:62)
...
在最新版本 (0.9-milestone-1) 中,一个 collect()
方法被添加到 Flink
public List<T> collect()
将 DataSet<T>
作为 List<T>
提取到驱动程序。 collect()
也会触发程序立即执行(不需要调用 ExecutionEnvironment.execute()
)。目前,大约 10 MB 的数据集有大小限制。
如果您不评估驱动程序中的模型,您也可以将两个程序链接在一起,并通过附加数据接收器将模型发送到一边。这会更有效率,因为数据不会在客户端机器上往返。
如果您使用的是 0.9 之前的 Flink,您可以使用以下代码片段将数据集收集到本地集合:
val dataJavaList = new ArrayList[K]
val outputFormat = new LocalCollectionOutputFormat[K](dataJavaList)
dataset.output(outputFormat)
env.execute("collect()")
其中 K
是您要收集的对象类型