to-flink并行化方法

unftdfkk  于 2021-06-25  发布在  Flink
关注(0)|答案(2)|浏览(381)

我是新的flink和目前的工作转换的一些代码示例在Spark到flink。flink中的类似函数是什么 JavaSparkContext ? 我尝试转换以下代码:

JavaRDD<Integer> workload = ctx.parallelize(Arrays.asList(init_val), parallel).map(new Function<Integer, Integer>() {
      @Override
      public Integer call(Integer s) throws InterruptedException {
        Thread.sleep(s * 1000);
        return 0;
      }
    });
xggvc2p6

xggvc2p61#

你会用 fromCollection 方法由 ExecutionEnvironment .

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Integer> input = env.fromCollection(inputList);
j9per5c4

j9per5c42#

Flink的等价物 JavaSparkContext.parallelize()ExecutionEnvironment.fromCollection() .
因此,您的代码片段应该转换为:

// get execution environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// create data set from collection
DataSet<Integer> input = env.fromCollection(Arrays.asList(init_val));
// apply map function
DataSet<Integer> result = input.map(new MapFunction<Integer, Integer>() {
  @Override
  public Integer map(Integer s) {
    Thread.sleep(s * 1000);
    return 0;
  }
}).setParallelism(parallel); // set parallelism of map function

相关问题