Spark:并行转换多个 Dataframe

dldeef67  于 2023-03-13  发布在  Apache
关注(0)|答案(2)|浏览(105)

了解如何在并行转换多个 Dataframe 时实现最佳并行性
我有一个路径数组

val paths = Array("path1", "path2", .....

我从每个路径加载 Dataframe ,然后转换并写入目标路径

paths.foreach(path => {
  val df = spark.read.parquet(path)
  df.transform(processData).write.parquet(path+"_processed")
})

转换processData与我正在加载的 Dataframe 无关。
这限制了我一次只能处理一个 Dataframe ,而且我的大部分集群资源都是空闲的,因为处理每个 Dataframe 是独立的,所以我把scala的Array转换成了ParArray

paths.par.foreach(path => {
  val df = spark.read.parquet(path)
  df.transform(processData).write.parquet(path+"_processed")
})

现在它在集群中使用了更多的资源,我仍然在尝试理解它是如何工作的,以及如何微调这里的并行处理
1.如果我使用ForkJoinPool增加默认的scala并行度到更高的数值,会不会导致更多的线程在驱动端产生,并将处于锁定状态等待foreach函数完成,最终杀死驱动?
1.它如何影响像EventLoggingListnener这样的集中式Spark,因为多个 Dataframe 是并行处理的,所以EventLoggingListnener需要处理更多的事件流入。
1.要实现最佳资源利用率,我需要考虑哪些参数。
1.任何其他方法
我可以通过任何资源来了解这种扩展,这将非常有帮助

dfty9e19

dfty9e191#

速度慢的原因是Spark非常擅长并行计算存储在一个大 Dataframe 中的大量数据。它在处理大量的 Dataframe 方面非常糟糕。2它将使用所有的执行器在一个 Dataframe 上开始计算(即使它们并不都需要),并等待它完成后再开始下一个。这会导致大量的非活动处理器。这很糟糕,但"这不是Spark设计的目的。
我有一个诀窍给你。可能需要改进一点,但你会有想法。下面是我会做的。从一个路径列表中,我会提取parquet文件的所有模式,并创建一个新的大模式来收集所有列。然后,我将要求spark读取使用此模式的所有parquet文件(不存在的列将被自动设置为空)。然后,我将联合所有 Dataframe ,并在这个大 Dataframe 上执行转换,最后使用partitionBy将 Dataframe 存储在单独的文件中,同时仍然并行地进行所有的工作。它看起来是这样的。

// let create two sample datasets with one column in common (id)
// and two different columns x != y
val d1 = spark.range(3).withColumn("x", 'id * 10)
d1.show
+---+----+
| id|  x |
+---+----+
|  0|   0|
|  1|  10|
|  2|  20|
+---+----+

val d2 = spark.range(2).withColumn("y", 'id cast "string")
d2.show
+---+---+
| id|  y|
+---+---+
|  0|  0|
|  1|  1|
+---+---+

// And I store them
d1.write.parquet("hdfs:///tmp/d1.parquet")
d2.write.parquet("hdfs:///tmp/d2.parquet")
// Now let's create the big schema
val paths = Seq("hdfs:///tmp/d1.parquet", "hdfs:///tmp/d2.parquet")
val fields = paths
    .flatMap(path => spark.read.parquet(path).schema.fields)
    .toSet //removing duplicates
    .toArray
val big_schema = StructType(fields)

// and let's use it
val dfs = paths.map{ path => 
    spark.read
        .schema(big_schema)
        .parquet(path)
        .withColumn("path", lit(path.split("/").last))
}

// Then we are ready to create one big dataframe
dfs.reduce( _ unionAll _).show
+---+----+----+----------+
| id|   x|   y|      file|
+---+----+----+----------+
|  1|   1|null|d1.parquet|
|  2|   2|null|d1.parquet|
|  0|   0|null|d1.parquet|
|  0|null|   0|d2.parquet|
|  1|null|   1|d2.parquet|
+---+----+----+----------+

然而,我不推荐在大量的 Dataframe 上使用unionAll,因为spark对执行计划的分析,它在处理大量的 Dataframe 时会非常慢,我会使用RDD版本,尽管它更冗长。

val rdds = sc.union(dfs.map(_.rdd))
// let's not forget to add the path to the schema
val big_df = spark.createDataFrame(rdds, 
    big_schema.add(StructField("path", StringType, true)))
transform(big_df)
    .write
    .partitionBy("path")
    .parquet("hdfs:///tmp/processed.parquet")

看看我处理过的目录,我得到了这个:

hdfs:///tmp/processed.parquet/_SUCCESS
hdfs:///tmp/processed.parquet/path=d1.parquet
hdfs:///tmp/processed.parquet/path=d2.parquet
2ic8powd

2ic8powd2#

你应该考虑一些变量,最重要的是:CPU内核,每个DF的大小和一点未来的使用。建议是决定每个DF的优先级处理。你可以使用公平配置,但这是不够的,并行处理可能会占用你的集群的一大部分。你必须分配优先级的DF和使用未来池来控制并行作业的数量运行在你的应用程序。

相关问题