pyspark中的多列重划分

628mspwn  于 2021-07-12  发布在  Spark
关注(0)|答案(1)|浏览(233)

我正在尝试重新分区和保存我的Dataframe,其中包含约2000万条记录到多个csv文件。 df.repartition('col1','col2','col3').write.csv(path) 我想把它保存到许多csv文件有独特的组合 ('col1', 'col2', 'col3') ,有时可能在4000左右。
我尝试过的方法:
我尝试显式地将shuffle分区值设置为4000 spark.conf.set("spark.sql.shuffle.partitions", 4000) 尝试执行分组方式并将分区号设置为组数。 partitioned = final_df.groupBy('col1','col2','col3').count() partition_no = partitioned.count() spark.conf.set("spark.sql.shuffle.partitions", 4000) 两种方法都产生了相同的结果。文件数小于分区数。如何确保保存的csv文件数与分区数相同?
感谢您的帮助。

vxf3dgd4

vxf3dgd41#

如果csv可以在磁盘上进行分区,则可以这样做:

df = spark.createDataFrame(
    [(x, x + 1, x - 1, "a") for x in range(1, 4)], ["col1", "col2", "col3", "col4"]
)

df.show()
+----+----+----+----+                                                           
|col1|col2|col3|col4|
+----+----+----+----+
|   1|   2|   0|   a|
|   2|   3|   1|   a|
|   3|   4|   2|   a|
+----+----+----+----+
df.repartition("col1", "col2", "col3").write.partitionBy("col1", "col2", "col3").csv(
    "test.csv", mode="overwrite"
)

它将在磁盘上生成如下csv:

.
├── _SUCCESS
├── col1=1
│   └── col2=2
│       └── col3=0
│           └── part-00001-76768c63-d6a6-4d8c-b5a2-8c0f4131d825.c000.csv
├── col1=2
│   └── col2=3
│       └── col3=1
│           └── part-00002-76768c63-d6a6-4d8c-b5a2-8c0f4131d825.c000.csv
└── col1=3
    └── col2=4
        └── col3=2
            └── part-00000-76768c63-d6a6-4d8c-b5a2-8c0f4131d825.c000.csv

然而,像这样的嵌套目录通常并不理想。另一个选项是创建一个新列,它是col1-col3的串联,然后使用 repartition 以及 partitionBy 在那个新专栏上。它将压平上面列出的目录结构。

相关问题