我正试图在java的flink 1.8.3中按照文档设置一个总体并行性设置:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
除此之外,我还有 setParallelism(2)
呼叫接收器和源。
我还可以在flinkui中看到应用了环境设置(长时间运行的会话集群、通过restapi或flinkui提交的作业):
但是当我查看flinkui中运行的各个阶段的并行度时,它们都是以parallelism 1运行的(除了source和sink,它们是以预期的并行度设置运行的):
我已经尝试过在单个操作符上设置并行性设置,但是它没有改变任何东西。算子是正规平面Map和滤波器。
这里没有配置什么让所有操作符都正确地遵守并行设置?我不能假设设置环境级别的并行性将自动应用于所有操作符吗?i、 设置平行度时,我是否也需要注意其他东西?
1条答案
按热度按时间xqk2d5yq1#
我“修复”了这个问题,没有尝试从flink作业代码内部更改并行设置,而是在启动flink作业时传递了一个并行设置。这不仅可以通过cli实现,还可以通过restapi和flinkui实现。我们现在一切正常。