当我运行配置单元作业时,此失败出现在同一阶段和任务中。
16/08/29 03:45:27 WARN TaskSetManager: Lost task 296.1 in stage 10.0 (TID 22783, lvshdc2dn0501.lvs.paypal.com): java.lang.OutOfMemoryError: Java heap space
at org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.expandPointerArray(UnsafeInMemorySorter.java:115)
at org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.insertRecord(UnsafeInMemorySorter.java:128)
at org.apache.spark.sql.execution.UnsafeKVExternalSorter.<init>(UnsafeKVExternalSorter.java:113)
at org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.destructAndCreateExternalSorter(UnsafeFixedWidthAggregationMap.java:257)
at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.switchToSortBasedAggregation(TungstenAggregationIterator.scala:435)
at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:379)
at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622)
at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.org$apache$spark$sql$execution$aggregate$TungstenAggregate$$anonfun$$executePartition$1(TungstenAggregate.scala:110)
at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119)
at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119)
at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:64)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
我将两个表连接起来,对结果进行分组计数,最后得到计数结果。代码如下:
override def monitor(sqlContext: SQLContext, date: String) = {
sqlContext.setConf("hive.map.aggr", "true")
sqlContext.setConf("hive.groupby.skewindata", "true")
//get data
val targetDF = getLinked(date, sqlContext)
//count what we need
getTargetMap(sqlContext, targetDF)
}
def getLinked(date: String, sqlContext: SQLContext): DataFrame = {
val l = sqlContext.read.parquet(get_l_path(date)).select("id").distinct()
val a = sqlContext.read.parquet(get_a_path(date)).select("cust_id", "seg")
targetLinkedEbay.registerTempTable("et")
accountAttr.registerTempTable("acct")
sqlContext.sql(
"""
|select a.seg as seg
| a.id as id
|from
| acct a join et e on a.cust_id=e.pp_id
""".stripMargin)
}
def getTargetMap(sqlContext: SQLContext, targetDF: DataFrame): Map[String, String] = {
val tmpTable = "tmp"
targetDF.registerTempTable(tmpTable)
sqlContext.sql(
s"""
|select
| seg, count(distinct id)
|from
| $tmpTable
|group by
| seg
""".stripMargin).collect()
.foldLeft(Map[String, String]())((map, row)=>map + (row.getString(0)->row.get(1).toString))
}
基本上,有两个process:firstly,我们将表l和a连接起来,我们从表a中提取id,并从表a中提取seg,表a的id存在于表l中。其次,在得到(id,seg)表之后(我们稍后调用target)。我们对同一段中的不同id进行计数,所以在目标表上使用count和group by。
目标表中大约有20亿条记录,只有6个不同的段,当然,一个类型的段有一半以上的记录,这是数据倾斜。
我们在Spark上用Hive。代码在scala中,我们通过spark的submitshell脚本将作业提交给spark。我发现hive.groupby.skewindata参数可能有用,所以我通过sqlcontext.setconf将其设置为true,但我不知道它是否有效?
暂无答案!
目前还没有任何答案,快来回答吧!