我想在写入文件之前,根据索引列将Spark DataFrame分区为偶数个分区。我想控制有多少分区创建的基础上的DataFrame的大小,然后使用时,写入到 parquet 文件使用partitionBy
。
有一个示例DataFrame:
i b
0 11
1 9
2 13
3 2
4 15
5 3
6 14
7 16
8 11
9 9
10 17
11 10
字符串
假设我想根据列i
中的值创建4个分区,那么这些分区将对应于分配给列g
的值:
g i b
0 0 11
0 1 9
0 2 13
1 3 2
1 4 15
1 5 3
2 6 14
2 7 16
2 8 11
3 9 9
3 10 17
3 11 10
型
在Spark中实现此操作的首选方法是什么?
2条答案
按热度按时间t9eec4r01#
虽然文档看起来有点难以理解,并对这个问题做了一些假设-iidoEe。它会喜欢4个或更确切地说N个文件(?)作为输出,在列“i”中以升序方式表示id,特此我自己的Spark 2.4改编的示例,该示例采用20条记录并将其拆分为4个均匀分布的分区,然后将其写出。我们走吧:
字符串
仅显示几个条目:
型
然后-包括一些额外的排序-这不是必要的,适用于所有格式:
型
这给出了4个文件,如下图所示:
x1c 0d1x的数据
您可以看到4个文件,第一个和最后一个零件的命名是显而易见的。运行:
型
显示5个记录,并且内容按照 Dataframe 连续排序。
型
对所有文件都做了检查,但只显示了一个。
最后评论
这是否是一个好主意就是另外一回事了。考虑非广播JOIN,这是一个问题。
此外,我显然不会硬编码4,而是应用一些N的公式来应用于partitionByRange!例如:
型
您必须测试DF Writer,因为文档并不完全清楚。
检查了EMR群集,其中包含200万条记录,4个文件以及输出。
wixjitnu2#
Spark中基于列和大小的DataFrame分区
步骤1:我们正在为此编写一个python函数
def partition_dataframe(df,col_name,col_name2,size1,size2):
df.repartition(2).write.partitionBy(col_name,col_name2).mode(“append”).保存(“/fileStore/tables/output_part”)return df
第二步:注册自定义项
partition_dataframe = udf(partition_dataframe)
第三步:传递参数给自定义项
partitioned_df = partition_dataframe(df,'device',' events',1024,1024)
第四步:使用python函数中添加的以下命令和位置检查分区文件
%fs ls /fileStore/tables/output_part_size1
**注意:**这里df为DataFrame,'device'和'events'为DataFrame中的列名,1024为分区大小,单位为KB。
我们可以根据需要修改函数,就像我们可以改变分区的数量一样,这里我们使用repartition(2)