我需要在每个前缀中输出一个唯一的文件,所以代码是这样写的 ds.repartition(1).write.partitionBy("prefix").mode(SaveMode.Overwrite).csv(output)
以前的代码没有添加重分区,每个前缀都会有上千个文件,而且任务可以在2小时内完成。添加重分区后,每个前缀将有1个文件,任务将执行7个小时以上。重新划分是在什么阶段执行的?我用这个很优雅吗?
我需要在每个前缀中输出一个唯一的文件,所以代码是这样写的 ds.repartition(1).write.partitionBy("prefix").mode(SaveMode.Overwrite).csv(output)
以前的代码没有添加重分区,每个前缀都会有上千个文件,而且任务可以在2小时内完成。添加重分区后,每个前缀将有1个文件,任务将执行7个小时以上。重新划分是在什么阶段执行的?我用这个很优雅吗?
2条答案
按热度按时间js4nwp541#
无论何时进行重新分区,它都会进行一次完全的无序排列,并尽可能均匀地分布数据。在您的例子中,当您执行ds.repartition(1)时,它洗牌所有数据,并将所有数据放在一个工作节点上的单个分区中。
现在,当您执行写操作时,只有一个工作节点/执行器在按前缀分区之后执行写操作。因为只有一个工人在做这项工作,所以要花很多时间。
你可以考虑的一些事情:
如果没有真正的理由只有一个csv文件,尽量避免这样做。
使用coalesce(1)代替repartition(1),coalesce(1)将执行最小洗牌,而不是repartition(1),repartition(1)将执行完全洗牌。
保存一个csv文件,你没有利用spark的并行能力。
wbgh16ku2#
如果要使用前缀作为分区列,则需要运行
您可以使用coalesce(1)而不是repartition(1),因为在这种情况下,coalesce不洗牌,repartition有洗牌,分区是一个,然后只有一个任务来处理所有数据。所以花了7个小时。