Spark 到 Flink 的并行化方法

Spark to Flink parallelize method

我是 Flink 的新手,目前正在将 Spark 中的一些代码示例转换为 Flink。 JavaSparkContext中的parallelize方法在Flink中有什么类似的功能?我尝试转换以下代码:

JavaRDD<Integer> workload = ctx.parallelize(Arrays.asList(init_val), parallel).map(new Function<Integer, Integer>() {
      @Override
      public Integer call(Integer s) throws InterruptedException {
        Thread.sleep(s * 1000);
        return 0;
      }
    });

Flink 等价于 JavaSparkContext.parallelize()ExecutionEnvironment.fromCollection().

所以你的代码片段应该翻译成这样:

// get execution environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// create data set from collection
DataSet<Integer> input = env.fromCollection(Arrays.asList(init_val));
// apply map function
DataSet<Integer> result = input.map(new MapFunction<Integer, Integer>() {
  @Override
  public Integer map(Integer s) {
    Thread.sleep(s * 1000);
    return 0;
  }
}).setParallelism(parallel); // set parallelism of map function

您将使用 ExecutionEnvironment 提供的 fromCollection 方法。

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Integer> input = env.fromCollection(inputList);