我正在用pyspark编写spark中的批处理程序。以下是输入文件及其大小
base-track.dat (3.9g)
base-attribute-link.dat (18g)
base-release.dat (543m)
这些是文本文件,每行一条记录,每个字段用一个特殊字符分隔(参见代码)
我正在对属性链接执行一些过滤操作,对它们进行分组并与其他表连接。
我通过spark提交这个程序,提交到一个hadoop集群,由ambari管理9个数据节点。每个数据节点包含140 gb的ram和3.5 tb的磁盘空间。
下面是我的Pypark代码
import sys
from pyspark import SparkContext
from pyspark.sql import SQLContext, Row
if __name__ == "__main__":
sc = SparkContext(appName = "Tracks")
sqlContext = SQLContext(sc)
#Load base-track
track = sc.textFile("base-track/input").map(lambda row: row.split(u'\u0001'))
#Load base-attribute-link
attlnk = sc.textFile("base-attribute-link/input").map(lambda row: row.split(u'\u0001'))
#Load base-release
release = sc.textFile("base-release/input").map(lambda row: row.split(u'\u0001'))
attlnk = attlnk.filter(lambda row: row[2] == 'MA0000000162')
attlnkg = attlnk.groupBy(lambda row: row[1])
attlnkmax = attlnkg.map( lambda t: (t[0],max([v[4] for v in t[1]])) )
alg = attlnkmax.map(lambda r: Row(al_objectid=r[0],al_value=r[1]))
aldf = alg.toDF()
track = track.map(lambda r:Row(t_tag = r[0], t_trackid= r[1], t_releaseid= r[2], t_songid = r[3], t_med= r[4], t_ph = r[5], t_tn = r[5], t_title= r[5], t_part= r[6], t_dur = r[7], t_pick = r[8], t_amgclid = r[9], t_amgpopid = r[10], t_compid = r[11], t_muzid = r[12], t_perfid= r[13], t_albumid = r[14]))
trackdf = track.toDF()
release = release.map(lambda r:Row(r_tag = r[0], r_relid = r[1], r_albumid = r[2], r_mediafmtid = r[3], r_prodfmtid = r[4], r_reldate = r[5], r_prodcode = r[6], r_prodtypeid = r[7], r_label = r[8], r_relyear = r[9], r_ispurch = r[10], r_amgclassid = r[11], r_amgpopid = r[12], r_eanid = r[13], r_upcid = r[14]))
releasedf = release.toDF()
trackaldf = trackdf.join(aldf, trackdf['t_trackid'] == aldf['al_objectid'], 'left_outer')
tracksdf = trackaldf.join(releasedf, trackaldf['t_releaseid'] == releasedf['r_relid'])
tracksdf = tracksdf.select('t_trackid', 't_releaseid', 't_songid', 't_med', 't_ph', 't_tn', 't_title', 't_part', 't_dur', 't_pick', 't_amgclid', 't_amgpopid', 't_compid', 't_muzid', 'al_objectid', 't_perfid', 't_albumid', 'r_label')
tracksdf.rdd.map(lambda x: u"\u0001".join(map(str, x))).coalesce(100).saveAsTextFile("tracks-out")
当它试图执行此操作时,出现以下一系列错误。
ERROR DiskBlockObjectWriter: Uncaught exception while reverting partial writes to file /tmp/blockmgr-d88c631e-cec3-4b83-8af6-a38b109b5e3b/0e/temp_shuffle_7dbda3ac-48b1-4c4a-89c7-64eb5d858d90
java.io.IOException: No space left on device
at java.io.FileOutputStream.writeBytes(Native Method)
at java.io.FileOutputStream.write(FileOutputStream.java:326)
at org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:58)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
at org.xerial.snappy.SnappyOutputStream.flush(SnappyOutputStream.java:336)
at org.apache.spark.io.SnappyOutputStreamWrapper.flush(CompressionCodec.scala:209)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:141)
at java.io.DataOutputStream.flush(DataOutputStream.java:123)
at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2.flush(UnsafeRowSerializer.scala:83)
at org.apache.spark.storage.DiskBlockObjectWriter$$anonfun$revertPartialWritesAndClose$1.apply$mcV$sp(DiskBlockObjectWriter.scala:157)
at org.apache.spark.storage.DiskBlockObjectWriter$$anonfun$revertPartialWritesAndClose$1.apply(DiskBlockObjectWriter.scala:154)
at org.apache.spark.storage.DiskBlockObjectWriter$$anonfun$revertPartialWritesAndClose$1.apply(DiskBlockObjectWriter.scala:154)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1239)
at org.apache.spark.storage.DiskBlockObjectWriter.revertPartialWritesAndClose(DiskBlockObjectWriter.scala:161)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.stop(BypassMergeSortShuffleWriter.java:232)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
这里有几个问题,这里和这里都和同一个问题有关。
下面是我从以上两个问题中尝试的。我试图将spark.yarn.executor.memoryoverhead从384 mb增加到4gb。
SPARK_JAVA_OPTS+=" -Dspark.local.dir=/mnt/spark,/mnt2/spark -Dhadoop.tmp.dir=/mnt/ephemeral-hdfs"
export SPARK_JAVA_OPTS
第一个没有任何效果。如果我添加java选项,/mnt目录将不存在。
在多个论坛(包括databricks)上读到这个问题之后,我有了一个模糊的想法,即这个作业试图创建临时文件,作为每个集群节点的shuffle-on/tmp的一部分,并耗尽空间。在每个集群节点上,我们为tmp目录所在的根(/)分区分配了100gb。
我已经花了一个多月的时间通过使用各种spark配置参数来执行这个命令。作为调整的一部分,我将spark.driver和spark.executor内存增加到16g,后来又增加到64g。还增加了Spark线执行器内存到4gb。不幸的是,这些都不能解决太空问题。
任何关于如何进一步进行的指导都会大有帮助。
[edit-1]我刚才检查了所有机器上根目录的磁盘空间,我们群集中的9个节点中有7个为根目录分配了100 gb以上的空间,但是在2个节点上只分配了10 gb,只剩下6 gb以上的空间。这可能会导致磁盘空间的问题,我将不得不与我们的it团队检查,如果根目录的大小可以扩展。
[edit-2]我与it团队合作,将所有计算机上的根分区大小扩展到100+gb,但问题仍然存在,可能100gb/tmp空间也不足以完成此任务。我估计这个作业的输出大约为4.6gb。
2条答案
按热度按时间6fe3ivhb1#
我发现我并没有将spark作业提交给集群,而是提交给一台机器,从而导致了磁盘空间问题。我总是用下面的方式提交我的剧本
因为我希望我的脚本在hadoop集群上执行,并使用yarn作为资源管理器,所以我将submit命令改为以下命令,然后就可以正常工作了。
new9mtju2#
考虑到错误的性质,以及您正在对数十gb的数据执行大型联接(spark workers会在数据洗牌时将中间数据写入磁盘),100 gb的磁盘似乎不够。我建议为默认的worker目录和local目录分配更多的磁盘,方法是将它们装载到更大的磁盘上,或者配置更大的根磁盘。另外,请注意,如果spark未正确关闭,则此中间数据可能会延迟,并占用工作节点上的大量空间。因此,您可能需要检查这些目录并删除所有过时的文件。如果您在awsr3、c3或类似的示例类型上运行spark standalone,并且使用大型临时ssd磁盘,我建议将这些磁盘安装为“mnt”和“mnt2”,并将spark scratch space配置为指向这些安装,而不是(通常)较小的根卷。例如: