背景:在hadoop流中,每个reduce作业在完成时都会写入hdfs,从而为hadoop集群执行下一个reduce扫清了道路。
我很难将这个范例Map到(py)spark。
举个例子,
df = spark.read.load('path')
df.rdd.reduceByKey(my_func).toDF().write.save('output_path')
当我运行这个程序时,集群在将数据写入磁盘之前收集Dataframe中的所有数据。至少在我观察工作进展的时候,事情看起来是这样的。
我的问题是,我的数据比我的集群内存大得多,所以在写入任何数据之前,我的内存就用完了。在hadoop流媒体中,我们没有这个问题,因为输出数据流到磁盘,为后续的数据批处理腾出空间。
我考虑过这样的事情:
for i in range(100):
(df.filter(df.loop_index==i)
.rdd
.reduceByKey(my_func)
.toDF()
.write.mode('append')
.save('output_path'))
在每次迭代中,我只处理数据的一个子集。但这看起来很困难,主要是因为我要么坚持 df
,这是不可能的,因为内存限制,或者我必须在每次迭代中重新读取输入hdfs源。
使循环工作的一种方法是按日期或其他数据子集对源文件夹进行分区。但是为了这个问题,让我们假设这是不可能的。
问题:我如何在Pypark中运行这样的工作?我需要一个更大的集群吗?如果是这样,在处理数据之前调整集群大小的常见做法是什么?
1条答案
按热度按时间smdnsysy1#
在大量分区中重新划分数据可能会有所帮助。下面的示例类似于for循环,不过您可能希望先尝试使用较少的分区
您还应该查看当前使用的执行者的数量(
--num-executors
). 减少这个数字也会减少内存占用。