我想使用spark将一个大的(51gb)xml文件(在一个外部hdd上)读入一个Dataframe(使用sparkxml插件),进行简单的Map/过滤,重新排序,然后将其作为csv文件写回磁盘。
但我总是得到一个 java.lang.OutOfMemoryError: Java heap space
不管我怎么调整。
我想了解为什么增加分区数不能阻止oom错误
它不应该把任务分成更多的部分,这样每个部分都更小,不会引起记忆问题吗?
(spark不可能把所有东西都塞进内存,如果不合适就崩溃,对吧??)
我尝试过的事情:
读取和写入Dataframe时(初始值为1604),重新分区/合并到(5000和10000个分区)Dataframe
使用较少数量的执行器(6,4,即使有2个执行器,我也会得到oom错误!)
减小分割文件的大小(默认值为33mb)
给我几吨公羊(我所有的)
增加 spark.memory.fraction
设置为0.8(默认值为0.6)
减少 spark.memory.storageFraction
设置为0.2(默认值为0.5)
套 spark.default.parallelism
到30和40(我的默认值是8)
套 spark.files.maxPartitionBytes
至64m(默认为128m)
我所有的代码都在这里(注意我没有缓存任何东西):
val df: DataFrame = spark.sqlContext.read
.option("mode", "DROPMALFORMED")
.format("com.databricks.spark.xml")
.schema(customSchema) // defined previously
.option("rowTag", "row")
.load(s"$pathToInputXML")
println(s"\n\nNUM PARTITIONS: ${df.rdd.getNumPartitions}\n\n")
// prints 1604
// i pass `numPartitions` as cli arguments
val df2 = df.coalesce(numPartitions)
// filter and select only the cols i'm interested in
val dsout = df2
.where( df2.col("_TypeId") === "1" )
.select(
df("_Id").as("id"),
df("_Title").as("title"),
df("_Body").as("body"),
).as[Post]
// regexes to clean the text
val tagPat = "<[^>]+>".r
val angularBracketsPat = "><|>|<"
val whitespacePat = """\s+""".r
// more mapping
dsout
.map{
case Post(id,title,body,tags) =>
val body1 = tagPat.replaceAllIn(body,"")
val body2 = whitespacePat.replaceAllIn(body1," ")
Post(id,title.toLowerCase,body2.toLowerCase, tags.split(angularBracketsPat).mkString(","))
}
.orderBy(rand(SEED)) // random sort
.write // write it back to disk
.option("quoteAll", true)
.mode(SaveMode.Overwrite)
.csv(output)
笔记
输入分割非常小(仅33mb),所以为什么我不能每个8个线程处理一个分割呢?这真的不应该打击我的记忆
更新我已经写了一个简短版本的代码,只读取文件,然后foreachpartition(println)。
我得到同样的错误:
val df: DataFrame = spark.sqlContext.read
.option("mode", "DROPMALFORMED")
.format("com.databricks.spark.xml")
.schema(customSchema)
.option("rowTag", "row")
.load(s"$pathToInputXML")
.repartition(numPartitions)
println(s"\n\nNUM PARTITIONS: ${df.rdd.getNumPartitions}\n\n")
df
.where(df.col("_PostTypeId") === "1")
.select(
df("_Id").as("id"),
df("_Title").as("title"),
df("_Body").as("body"),
df("_Tags").as("tags")
).as[Post]
.map {
case Post(id, title, body, tags) =>
Post(id, title.toLowerCase, body.toLowerCase, tags.toLowerCase))
}
.foreachPartition { rdd =>
if (rdd.nonEmpty) {
println(s"HI! I'm an RDD and I have ${rdd.size} elements!")
}
}
p、 答:我用的是sparkv2.1.0。我的机器有8个内核和16gb内存。
3条答案
按热度按时间vulvrdjw1#
我在运行sparkshell时遇到了这个错误,因此我将驱动程序内存增加到了一个很高的数字。然后我就可以加载xml了。
资料来源:https://github.com/lintool/warcbase/issues/246#issuecomment-249272263
6rqinv9w2#
因为您要存储rdd两次,并且您的逻辑必须这样更改,或者使用sparksql进行过滤
iugsix8n3#
可以通过在环境变量中添加以下内容来更改堆大小:
环境变量名:\u java\u options
环境变量值:-xmx512m-xms512m