Flink:Dataset 和 Datastream API 在一个程序中。可能吗?
Flink: Dataset and Datastream API in one program. Is it possible?
我想先使用数据集 API 处理静态数据,然后使用 DataStream API 来 运行 流作业。如果我在 IDE 上编写代码,它会完美运行。但是当我在本地 flink jobmanager(所有并行度 1)上尝试 运行ning 时,流代码永远不会执行!
例如,以下代码不起作用:
val parallelism = 1
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(parallelism)
val envStatic = ExecutionEnvironment.getExecutionEnvironment
envStatic.setParallelism(parallelism)
val myStaticData = envStatic.fromCollection(1 to 10)
val myVal: Int = myStaticData.reduce(_ + _).collect().head
val theStream = env.fromElements(1).iterate( iteretion => {
val result = iteretion.map(x => x + myVal)
(result, result)
})
theStream.print()
env.execute("static and streaming together")
我应该怎样做才能使它正常工作?
日志:execution logs for above program
执行计划:plan
似乎是非循环的。
如果你有一个由多个子作业组成的 Flink 作业,例如由 count
、collect
或 print
触发,则无法通过 Web 界面提交作业。 Web 界面仅支持单个 Flink 作业。
我想先使用数据集 API 处理静态数据,然后使用 DataStream API 来 运行 流作业。如果我在 IDE 上编写代码,它会完美运行。但是当我在本地 flink jobmanager(所有并行度 1)上尝试 运行ning 时,流代码永远不会执行!
例如,以下代码不起作用:
val parallelism = 1
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(parallelism)
val envStatic = ExecutionEnvironment.getExecutionEnvironment
envStatic.setParallelism(parallelism)
val myStaticData = envStatic.fromCollection(1 to 10)
val myVal: Int = myStaticData.reduce(_ + _).collect().head
val theStream = env.fromElements(1).iterate( iteretion => {
val result = iteretion.map(x => x + myVal)
(result, result)
})
theStream.print()
env.execute("static and streaming together")
我应该怎样做才能使它正常工作?
日志:execution logs for above program
执行计划:plan 似乎是非循环的。
如果你有一个由多个子作业组成的 Flink 作业,例如由 count
、collect
或 print
触发,则无法通过 Web 界面提交作业。 Web 界面仅支持单个 Flink 作业。