scala—将多个文件作为独立的rdd并行处理

kdfy810k  于 2021-06-02  发布在  Hadoop
关注(0)|答案(3)|浏览(442)

我有一个场景,其中一定数量的操作(包括groupby)必须应用于许多小文件(每个文件约300mb)。手术看起来像这样。。 df.groupBy(....).agg(....) 现在要在多个文件上处理它,我可以使用通配符“/**/*.csv”,但是,它会创建一个rdd并将其分区以用于操作。但是,从操作上看,这是一个分组操作,并且涉及大量的洗牌,如果文件相互排斥,则洗牌是不必要的。
我所看到的是,我可以在文件上创建独立的rdd并独立地对它们进行操作。

h43kikqp

h43kikqp1#

这是一个想法,而不是一个完整的解决方案,我还没有测试它。
您可以从将数据处理管道提取到函数中开始。

def pipeline(f: String, n: Int) = {
    sqlContext
        .read
        .format("com.databricks.spark.csv")
        .option("header", "true")
        .load(f)
        .repartition(n)
        .groupBy(...)
        .agg(...)
        .cache // Cache so we can force computation later
}

如果你的文件很小,你可以调整 n 参数使用尽可能少的分区来容纳单个文件中的数据并避免乱序。这意味着您正在限制并发性,但我们稍后将继续讨论此问题。

val n: Int = ???

接下来,您必须获得输入文件的列表。此步骤取决于数据源,但大多数情况下,它或多或少是简单的:

val files: Array[String] = ???

接下来,您可以使用 pipeline 功能:

val rdds = files.map(f => pipeline(f, n))

由于我们将并发限制在单个文件的级别,因此我们希望通过提交多个作业来补偿。让我们添加一个简单的助手,强制求值并用 Future ```
import scala.concurrent._
import ExecutionContext.Implicits.global

def pipelineToFuture(df: org.apache.spark.sql.DataFrame) = future {
df.rdd.foreach(_ => ()) // Force computation
df
}

最后我们可以在 `rdds` :

val result = Future.sequence(
rdds.map(rdd => pipelineToFuture(rdd)).toList
)

根据您的要求,您可以添加 `onComplete` 回调或使用React流来收集结果。
s3fp2yjn

s3fp2yjn2#

这样我们就可以并行地编写多个rdd

public class ParallelWriteSevice implements IApplicationEventListener {

    private static final IprogramLogger logger = programLoggerFactory.getLogger(ParallelWriteSevice.class);

    private static ExecutorService executorService=null;
    private static List<Future<Boolean>> futures=new ArrayList<Future<Boolean>>();

    public static void submit(Callable callable) {
        if(executorService==null)
        {
            executorService=Executors.newFixedThreadPool(15);//Based on target tables increase this
        }

        futures.add(executorService.submit(callable));
    }

    public static boolean isWriteSucess() {
        boolean writeFailureOccured = false;
        try {
            for (Future<Boolean> future : futures) {
                try {
                    Boolean writeStatus = future.get();
                    if (writeStatus == false) {
                        writeFailureOccured = true;
                    }
                } catch (Exception e) {
                    logger.error("Erorr - Scdeduled write failed " + e.getMessage(), e);
                    writeFailureOccured = true;
                }
            }
        } finally {
            resetFutures();         
              if (executorService != null) 
                  executorService.shutdown();
              executorService = null;

        }
        return !writeFailureOccured;
    }

    private static void resetFutures() {
            logger.error("resetFutures called");
            //futures.clear();
    }

}
pxiryf3j

pxiryf3j3#

如果你有很多文件,而且每个文件都很小(你说300mb以上,我认为spark是小的),你可以尝试使用 SparkContext.wholeTextFiles 它将创建一个rdd,其中每个记录都是一个完整的文件。

相关问题