我使用spark runner进行模拟:
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
Pipeline p = Pipeline.create(options);
p.apply(Create.of(1))
.apply(ParDo.of(new DoFn<Integer, Integer>() {
@ProcessElement
public void apply(@Element Integer element, OutputReceiver<Integer> outputReceiver) {
IntStream.range(0, 4_000_000).forEach(outputReceiver::output);
}
}))
.apply(Reshuffle.viaRandomKey())
.apply(ParDo.of(new DoFn<Integer, Integer>() {
@ProcessElement
public void apply(@Element Integer element, OutputReceiver<Integer> outputReceiver) {
try {
// simulate a rpc call of 10ms
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
outputReceiver.output(element);
}
}));
PipelineResult result = p.run();
result.waitUntilFinish();
我正在使用 --runner=SparkRunner --sparkMaster=local[8]
但重组后只使用了一个线程。为什么重新洗牌不起作用?
如果我为此改变改组:
.apply(MapElements.into(kvs(integers(), integers())).via(e -> KV.of(e % 8, e)))
.apply(GroupByKey.create())
.apply(Values.create())
.apply(Flatten.iterables())
然后我得到8线程运行。
比尔,拉斐尔。
1条答案
按热度按时间gtlvzcf81#
看起来beam-on-spark的重组归根结底是在
https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/groupcombinefunctions.java#l191
我想知道在这种情况下
rdd.context().defaultParallelism()
以及rdd.getNumPartitions()
是1。我已经备案了https://issues.apache.org/jira/browse/beam-10834 调查。同时,您可以使用groupbykey来获得所需的并行性,如您所示(如果没有整数,可以尝试使用元素的哈希、math.random()或甚至递增计数器作为键)。