如何在纯java中设置flink并行(idea)

r1wp621o  于 2021-06-26  发布在  Flink
关注(0)|答案(1)|浏览(410)

我使用下面的scala代码来运行我的flink流媒体作业

val mystream = StreamExecutionEnvironment.getExecutionEnvironment
    mystream.addSource(new mySource(params))
      .map(new myMap(params))
      .addSink(new mySink(params)).setParallelism(1)
    mystream.setParallelism(1)
    mystream.execute("My Streaming")

当我使用 flink run -p 1 ,并行度为1(不知道-p是否有效或代码是否有效)。当我使用纯java来运行时(在我的想法中,我假设它在纯java中运行),并行度通常是5,这表明我的代码不起作用。如何控制?
正如上面的回答所建议的,下面的代码也不起作用,还有5的并行性。

val mystream = StreamExecutionEnvironment.getExecutionEnvironment
    mystream.addSource(new mySource(params))
      .map(new myMap(params))
      .addSink(new mySink(params))
    mystream.setParallelism(1)
    mystream.execute("My Streaming")
mefy6pfw

mefy6pfw1#

您可以在环境上设置默认的并行性。

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.addSource(...)

使用 .addSink(new mySink(params)).setParallelism(1) 重写特定运算符的默认并行性。

相关问题