我使用下面的scala代码来运行我的flink流媒体作业
val mystream = StreamExecutionEnvironment.getExecutionEnvironment
mystream.addSource(new mySource(params))
.map(new myMap(params))
.addSink(new mySink(params)).setParallelism(1)
mystream.setParallelism(1)
mystream.execute("My Streaming")
当我使用 flink run -p 1
,并行度为1(不知道-p是否有效或代码是否有效)。当我使用纯java来运行时(在我的想法中,我假设它在纯java中运行),并行度通常是5,这表明我的代码不起作用。如何控制?
正如上面的回答所建议的,下面的代码也不起作用,还有5的并行性。
val mystream = StreamExecutionEnvironment.getExecutionEnvironment
mystream.addSource(new mySource(params))
.map(new myMap(params))
.addSink(new mySink(params))
mystream.setParallelism(1)
mystream.execute("My Streaming")
1条答案
按热度按时间mefy6pfw1#
您可以在环境上设置默认的并行性。
使用
.addSink(new mySink(params)).setParallelism(1)
重写特定运算符的默认并行性。