我想根据x列重新划分sparkDataframe。假设x列有3个不同的值(x1,x2,x3)。不同值的数目可能不同。
我想要一个分区包含只有1个x值的记录。我想要3个分区,其中1个分区有x=x1的记录,另一个分区有x=x2,最后一个分区有x=x3。
我从dataframe查询得到x的唯一值
val uniqueList = DF.select("X").distinct().map(x => x(0).toString).collect()
正确给出唯一值的列表。
以及我正在做的重新划分
DF = DF.repartition(uniqueList.length, col('X'))
然而,我在df中的分区并不像预期的那样出现。数据分布不正确,因为一个分区为空,第二个分区包含x1的记录,第三个分区同时包含x2和x3的记录。
如果我遗漏了什么,有人能帮忙吗。
编辑:
我的列x可以有不同数量的唯一值。它可以有3个或3000个唯一值。如果我在下面
DF = DF.repartition(col('X'))
我将只得到200个分区,因为这是spark.sql.shuffle.partitions的默认值。所以我给出了分区的数目
如果有3000个唯一的x值,那么我想重新划分df,这样就有3000个分区,每个分区包含一个特定x值的记录。这样我就可以运行mappartition并并行处理每个分区。
3条答案
按热度按时间mwkjh3gx1#
重分区是基于散列分区的(以分区键的散列码模分区个数为准),所以每个分区是否只有一个值纯粹是偶然的。
如果可以将每个分区键Map到一个唯一的
Int
在0到(唯一值的数目-1)的范围内,因为Int
在scala中,这将确保如果分区的数量至少与唯一值的数量相同,则没有分区具有多个不同的分区键值。也就是说,给这些
Int
s本质上是不可并行的,需要进行顺序扫描或提前知道不同的值。概率上,一个特定值散列到(n个分区)的特定分区的概率是1/n。当n相对于不同值的数量增加时,没有一个分区具有多个不同值的可能性增加(在限制条件下,如果可以有2^32个分区,则几乎所有分区都为空,但实际的哈希冲突仍会保证分区中有多个不同值)。因此,如果您可以容忍空分区,那么选择一个足够大的分区数(大于不同值的数目)将减少出现次理想结果的可能性。
piv4azn72#
这有用吗?
下面是来自partitionby博客文章的示例。
数据:
代码:
文件系统输出:
b4wnujal3#
顺便问一下,您的列x是否包含空值?然后spark尝试为此创建一个分区。由于您也将分区数指定为int,因此可能是spark试图挤压x2和x3。因此,您可以尝试两种方法-只需提供列名以进行修复(仍然有一个额外的分区),或者尝试从x中删除空值(如果存在的话)。