spark hive作业不断失败,内存不足:java堆空间错误

a6b3iqyw  于 2021-06-28  发布在  Hive
关注(0)|答案(0)|浏览(297)

当我运行配置单元作业时,此失败出现在同一阶段和任务中。

16/08/29 03:45:27 WARN TaskSetManager: Lost task 296.1 in stage 10.0 (TID 22783, lvshdc2dn0501.lvs.paypal.com): java.lang.OutOfMemoryError: Java heap space
    at org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.expandPointerArray(UnsafeInMemorySorter.java:115)
    at org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.insertRecord(UnsafeInMemorySorter.java:128)
    at org.apache.spark.sql.execution.UnsafeKVExternalSorter.<init>(UnsafeKVExternalSorter.java:113)
    at org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.destructAndCreateExternalSorter(UnsafeFixedWidthAggregationMap.java:257)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.switchToSortBasedAggregation(TungstenAggregationIterator.scala:435)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:379)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.org$apache$spark$sql$execution$aggregate$TungstenAggregate$$anonfun$$executePartition$1(TungstenAggregate.scala:110)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119)
    at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:64)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
    at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:63)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:88)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

我将两个表连接起来,对结果进行分组计数,最后得到计数结果。代码如下:

override def monitor(sqlContext: SQLContext, date: String) = {
  sqlContext.setConf("hive.map.aggr", "true")
  sqlContext.setConf("hive.groupby.skewindata", "true")
  //get data
  val targetDF = getLinked(date, sqlContext)
  //count what we need
  getTargetMap(sqlContext, targetDF)

}

def getLinked(date: String, sqlContext: SQLContext): DataFrame = {
  val l = sqlContext.read.parquet(get_l_path(date)).select("id").distinct()
  val a = sqlContext.read.parquet(get_a_path(date)).select("cust_id", "seg")
  targetLinkedEbay.registerTempTable("et")
  accountAttr.registerTempTable("acct")

  sqlContext.sql(
  """
    |select a.seg as seg
    | a.id as id
    |from
    | acct a join et e on a.cust_id=e.pp_id
  """.stripMargin)
}

def getTargetMap(sqlContext: SQLContext, targetDF: DataFrame): Map[String, String] = {
  val tmpTable = "tmp"
  targetDF.registerTempTable(tmpTable)
  sqlContext.sql(
  s"""
    |select
    | seg, count(distinct id)
    |from
    | $tmpTable
    |group by
    | seg
  """.stripMargin).collect()
  .foldLeft(Map[String, String]())((map, row)=>map + (row.getString(0)->row.get(1).toString))
}

基本上,有两个process:firstly,我们将表l和a连接起来,我们从表a中提取id,并从表a中提取seg,表a的id存在于表l中。其次,在得到(id,seg)表之后(我们稍后调用target)。我们对同一段中的不同id进行计数,所以在目标表上使用count和group by。
目标表中大约有20亿条记录,只有6个不同的段,当然,一个类型的段有一半以上的记录,这是数据倾斜。
我们在Spark上用Hive。代码在scala中,我们通过spark的submitshell脚本将作业提交给spark。我发现hive.groupby.skewindata参数可能有用,所以我通过sqlcontext.setconf将其设置为true,但我不知道它是否有效?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题