Apache Spark 维护分区内和分区间的顺序

xwbd5t1u  于 2023-06-24  发布在  Apache
关注(0)|答案(3)|浏览(123)

我有一个已排序的pysparkdataframe

Col1 Col2
1     A
2     A
3     A
4     B
5     B
6     B

我希望所有的A都在一个文件中,B在另一个文件中,同时保持上面的确切顺序。
当我用下面的语句来写这个:

Data.coalesce(1).write.mode(“overwrite”).partitionBy(“col2”).csv(mypath)

Pyspark打乱每个单独文件中的数据,从而更改已排序dataframe的顺序。
有人能帮忙吗?
注意:使用shuffleWithinpartition可能是不可能的,因为我能够使用复杂的逻辑在上面的表中创建这个排序顺序,而不仅仅是一个列。上表只是一个虚拟示例。

1l5u6lss

1l5u6lss1#

我认为你可以使用重新分配方法而不是合并。repartition允许您指定输出分区的数量,您可以将其设置为Col2中不同值的数量,以确保Col2的每个值都在单独的分区中。下面是一个例子:

num_partitions = Data.select("Col2").distinct().count()
Data.repartition(num_partitions, 
"Col2").write.mode("overwrite").partitionBy("Col2").csv(mypath)
a5g8bdjr

a5g8bdjr2#

我认为如果按如下方式使用,数据将被排序,每个文件将在一个文件夹中:

num_partitions = Data.select("Col2").distinct().count()
sorted_data = Data.repartition(num_partitions, "Col2").orderBy("Col2")
coalesced_data = sorted_data.coalesce(1)
coalesced_data.write.mode("overwrite").partitionBy("Col2").csv(mypath)
fgw7neuy

fgw7neuy3#

@ind_1617:
通过在sorted_df上添加coalesce(1),可以看到生成的dataframe在作为CSV文件写入之前被合并到一个分区中。

from pyspark.sql.functions import monotonically_increasing_id

    df_with_id = df.withColumn("id", monotonically_increasing_id())
    repartitioned_df = df_with_id.repartition("col2")
    sorted_df = repartitioned_df.orderBy("id")

    # Coalesce(1) statement added here
    coalesced_df = sorted_df.coalesce(1)

    coalesced_df.write.mode("overwrite").partitionBy("col2").csv(mypath)

相关问题