根据现有列中的值将Spark DataFrame分区为选定数量的分区

35g0bw71  于 2023-08-06  发布在  Apache
关注(0)|答案(2)|浏览(146)

我想在写入文件之前,根据索引列将Spark DataFrame分区为偶数个分区。我想控制有多少分区创建的基础上的DataFrame的大小,然后使用时,写入到 parquet 文件使用partitionBy
有一个示例DataFrame:

i     b
 0    11
 1     9
 2    13
 3     2
 4    15
 5     3
 6    14
 7    16
 8    11
 9     9
 10   17
 11   10

字符串
假设我想根据列i中的值创建4个分区,那么这些分区将对应于分配给列g的值:

g    i     b
0    0    11
0    1     9
0    2    13
1    3     2
1    4    15
1    5     3
2    6    14
2    7    16
2    8    11
3    9     9
3   10    17
3   11    10


在Spark中实现此操作的首选方法是什么?

t9eec4r0

t9eec4r01#

虽然文档看起来有点难以理解,并对这个问题做了一些假设-iidoEe。它会喜欢4个或更确切地说N个文件(?)作为输出,在列“i”中以升序方式表示id,特此我自己的Spark 2.4改编的示例,该示例采用20条记录并将其拆分为4个均匀分布的分区,然后将其写出。我们走吧:

val list = sc.makeRDD((1 to 20)).map((_, 1,"2019-01-01", "2019-01-01",1,2,"XXXXXXXXXXXXXXXXXXXXXXXXXX"))

val df = list.toDF("customer_id", "dummy", "report_date", "date", "value_1", "value_2", "dummy_string")
df.show(false)

字符串
仅显示几个条目:

+-----------+-----+-----------+----------+-------+-------+--------------------------+
|customer_id|dummy|report_date|date      |value_1|value_2|dummy_string              |
+-----------+-----+-----------+----------+-------+-------+--------------------------+
|1          |1    |2019-01-01 |2019-01-01|1      |2      |XXXXXXXXXXXXXXXXXXXXXXXXXX|
|2          |1    |2019-01-01 |2019-01-01|1      |2      |XXXXXXXXXXXXXXXXXXXXXXXXXX|
|3          |1    |2019-01-01 |2019-01-01|1      |2      |XXXXXXXXXXXXXXXXXXXXXXXXXX|
|4          |1    |2019-01-01 |2019-01-01|1      |2      |XXXXXXXXXXXXXXXXXXXXXXXXXX|
|5          |1    |2019-01-01 |2019-01-01|1      |2      |XXXXXXXXXXXXXXXXXXXXXXXXXX|
|6          |1    |2019-01-01 |2019-01-01|1      |2      |XXXXXXXXXXXXXXXXXXXXXXXXXX|
|7          |1    |2019-01-01 |2019-01-01|1      |2      |XXXXXXXXXXXXXXXXXXXXXXXXXX|
...


然后-包括一些额外的排序-这不是必要的,适用于所有格式:

df.repartitionByRange(4, $"customer_id")
  .sortWithinPartitions("customer_id", "date", "value_1")
  .write
  .parquet("/tmp/SOQ6")


这给出了4个文件,如下图所示:
x1c 0d1x的数据
您可以看到4个文件,第一个和最后一个零件的命名是显而易见的。运行:

val lines = spark.read.parquet("/tmp/SOQ6/part-00000-tid-2518447510905190948-a81455f6-6c0b-4e02-89b0-57dfddf1fb97-1200-c000.snappy.parquet")
val words = lines.collect
lines.count


显示5个记录,并且内容按照 Dataframe 连续排序。

lines: org.apache.spark.sql.DataFrame = [customer_id: int, dummy: int ... 5 more fields]
 words: Array[org.apache.spark.sql.Row] = Array([1,1,2019-01-01,2019-01-01,1,2,XXXXXXXXXXXXXXXXXXXXXXXXXX], [2,1,2019-01-01,2019-01-01,1,2,XXXXXXXXXXXXXXXXXXXXXXXXXX], [3,1,2019-01-01,2019-01-01,1,2,XXXXXXXXXXXXXXXXXXXXXXXXXX], [4,1,2019-01-01,2019-01-01,1,2,XXXXXXXXXXXXXXXXXXXXXXXXXX], [5,1,2019-01-01,2019-01-01,1,2,XXXXXXXXXXXXXXXXXXXXXXXXXX])
res11: Long = 5


对所有文件都做了检查,但只显示了一个。

最后评论

这是否是一个好主意就是另外一回事了。考虑非广播JOIN,这是一个问题。
此外,我显然不会硬编码4,而是应用一些N的公式来应用于partitionByRange!例如:

val N = some calculation based on counts in DF and your cluster 
val df2 = df.repartition(N, $"c1", $"c2")


您必须测试DF Writer,因为文档并不完全清楚。
检查了EMR群集,其中包含200万条记录,4个文件以及输出。

wixjitnu

wixjitnu2#

Spark中基于列和大小的DataFrame分区

步骤1:我们正在为此编写一个python函数

def partition_dataframe(df,col_name,col_name2,size1,size2):
df.repartition(2).write.partitionBy(col_name,col_name2).mode(“append”).保存(“/fileStore/tables/output_part”)return df

第二步:注册自定义项

partition_dataframe = udf(partition_dataframe)

第三步:传递参数给自定义项

partitioned_df = partition_dataframe(df,'device',' events',1024,1024)

第四步:使用python函数中添加的以下命令和位置检查分区文件

%fs ls /fileStore/tables/output_part_size1

**注意:**这里df为DataFrame,'device'和'events'为DataFrame中的列名,1024为分区大小,单位为KB。

我们可以根据需要修改函数,就像我们可以改变分区的数量一样,这里我们使用repartition(2)

相关问题