spark重新划分未给出预期结果

t3psigkw  于 2021-05-27  发布在  Spark
关注(0)|答案(3)|浏览(358)

我想根据x列重新划分sparkDataframe。假设x列有3个不同的值(x1,x2,x3)。不同值的数目可能不同。
我想要一个分区包含只有1个x值的记录。我想要3个分区,其中1个分区有x=x1的记录,另一个分区有x=x2,最后一个分区有x=x3。
我从dataframe查询得到x的唯一值

val uniqueList = DF.select("X").distinct().map(x => x(0).toString).collect()

正确给出唯一值的列表。
以及我正在做的重新划分

DF = DF.repartition(uniqueList.length, col('X'))

然而,我在df中的分区并不像预期的那样出现。数据分布不正确,因为一个分区为空,第二个分区包含x1的记录,第三个分区同时包含x2和x3的记录。
如果我遗漏了什么,有人能帮忙吗。
编辑:
我的列x可以有不同数量的唯一值。它可以有3个或3000个唯一值。如果我在下面

DF = DF.repartition(col('X'))

我将只得到200个分区,因为这是spark.sql.shuffle.partitions的默认值。所以我给出了分区的数目
如果有3000个唯一的x值,那么我想重新划分df,这样就有3000个分区,每个分区包含一个特定x值的记录。这样我就可以运行mappartition并并行处理每个分区。

mwkjh3gx

mwkjh3gx1#

重分区是基于散列分区的(以分区键的散列码模分区个数为准),所以每个分区是否只有一个值纯粹是偶然的。
如果可以将每个分区键Map到一个唯一的 Int 在0到(唯一值的数目-1)的范围内,因为 Int 在scala中,这将确保如果分区的数量至少与唯一值的数量相同,则没有分区具有多个不同的分区键值。
也就是说,给这些 Int s本质上是不可并行的,需要进行顺序扫描或提前知道不同的值。
概率上,一个特定值散列到(n个分区)的特定分区的概率是1/n。当n相对于不同值的数量增加时,没有一个分区具有多个不同值的可能性增加(在限制条件下,如果可以有2^32个分区,则几乎所有分区都为空,但实际的哈希冲突仍会保证分区中有多个不同值)。因此,如果您可以容忍空分区,那么选择一个足够大的分区数(大于不同值的数目)将减少出现次理想结果的可能性。

piv4azn7

piv4azn72#

这有用吗?

val repartitionedDF = DF.repartition(col("X"))

下面是来自partitionby博客文章的示例。
数据:

first_name,last_name,country
Ernesto,Guevara,Argentina
Vladimir,Putin,Russia
Maria,Sharapova,Russia
Bruce,Lee,China
Jack,Ma,China

代码:

df
  .repartition(col("country"))
  .write
  .partitionBy("country")
  .parquet(outputPath)

文件系统输出:

partitioned_lake1/
  country=Argentina/
    part-00044-cf737804-90ea-4c37-94f8-9aa016f6953a.c000.snappy.parquet
  country=China/
    part-00059-cf737804-90ea-4c37-94f8-9aa016f6953a.c000.snappy.parquet
  country=Russia/
    part-00002-cf737804-90ea-4c37-94f8-9aa016f6953a.c000.snappy.parquet
b4wnujal

b4wnujal3#

顺便问一下,您的列x是否包含空值?然后spark尝试为此创建一个分区。由于您也将分区数指定为int,因此可能是spark试图挤压x2和x3。因此,您可以尝试两种方法-只需提供列名以进行修复(仍然有一个额外的分区),或者尝试从x中删除空值(如果存在的话)。

相关问题