我使用JUnit5运行了一个小测试用例
public class APipelineTest {
@ClassRule
public static MiniClusterWithClientResource flinkCluster =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(1)
.setNumberTaskManagers(1)
.build());
@Test
public void shouldRun() throws Exception {
var env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2); // <- only works when parallelism is set to 1
// if set >1, the app hangs, and prints nothing
var src = env.fromElements(1, 2, 3, 4, 5, 6);
src.map(i -> {
System.out.println("map: " + i);
return i;
});
env.execute();
}
}
字符串
我不明白为什么应用程序在env.setParallelism(1)
时工作,并且当并行度设置为大于1时没有打印出来。
我曾经看到过类似的现象,当水印在并行的有状态示例之间不对齐时,导致计算无法触发,表示为卡住的应用程序,但对于这样一个简单的情况,我无法连接如何使用未对齐的水印来解释这种情况。
1条答案
按热度按时间jchrr9hc1#
您创建了一个带有一个任务管理器和每个任务管理器一个插槽的Flink MiniCluster,因此您的作业的最大并行度为1。
字符串