我正在群集模式下运行spark流应用程序,每两分钟从文件系统读取一次文件。我有两个独立的操作块,它们是在缓存流上使用上次读取文件中的数据执行的。
JavaDStream<String> savedReads = reads.map(new ToFasta()).cache();
//Block 1
JavaDStream<LastResult> lastResults = savedReads.transform(new PipeToLast(lastDatabase))
.map(new ToLastResult(lastDatabase));
JavaEsSparkStreaming.saveToEs(lastResults, esIndexPrefix+"lastresults");
//Block 2
JavaDStream<String> centrifugeResults = savedReads.transform(new PipeToCentrifuge(centrifugeDatabasePath))
.map(new ToCentrifugeResult())
.filter(x -> x!=null);
JavaEsSparkStreaming.saveToEs(centrifugeResults, esIndexPrefix+"results");
我希望这两个块并行执行,因为否则批处理时间远远超过新文件到达的时间。我已经了解了公平调度模式和多个作业执行池的可能性。但我不知道如何将它用于数据流。
我使用java api和池调度,如下所示: stream.context().sparkContext().setLocalProperty("spark.scheduler.pool", "fair_pool");
如何实现两个块的作业在不同的池中并行执行而不是在同一个池中执行?
暂无答案!
目前还没有任何答案,快来回答吧!