作为spark作业提交时spark rddMap中的nullpointerexception

v1l68za4  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(464)

我们试图提交一个spark作业(spark2.0,hadoop2.7.2),但是由于某些原因,我们在emr中收到了一个相当神秘的npe。作为一个scala程序,一切都运行得很好,所以我们不确定是什么导致了这个问题。以下是堆栈跟踪:
18:02:55,271错误utils:91 - 正在中止位于org.apache.spark.sql.catalyst.expressions.generatedclass$generatediterator.agg\u doaggregatewithkeys$(未知源)的任务java.lang.nullpointerexception,位于org.apache.spark.sql.catalyst.expressions.generatedclass$generatediterator.processnext(未知源)org.apache.spark.sql.execution.bufferedrowtiterator.hasnext(bufferedrowtiterator。java:43)在org.apache.spark.sql.execution.whistagecodegenexec$$anonfun$8$$anon$1.hasnext(whistagecodegenexec。scala:370)在scala.collection.iterator$$anon$12.hasnext(iterator。scala:438)在org.apache.spark.sql.execution.datasources.defaultwritercontainer$$anonfun$writerows$1.apply$mcv$sp(writercontainer)。scala:253)在org.apache.spark.sql.execution.datasources.defaultwritercontainer$$anonfun$writerows$1.apply(writercontainer。scala:252)在org.apache.spark.sql.execution.datasources.defaultwritercontainer$$anonfun$writerows$1.apply(writercontainer)。scala:252)在org.apache.spark.util.utils$.trywithsafefinallyandfailurecallbacks(utils。scala:1325)在org.apache.spark.sql.execution.datasources.defaultwritercontainer.writerows(writercontainer。scala:258)在org.apache.spark.sql.execution.datasources.insertintohadoopfsrelationcommand$$anonfun$run$1$$anonfun$apply$mcv$sp$1.apply(insertintohadoopfsrelationcommand)。scala:143)在org.apache.spark.sql.execution.datasources.insertintohadoopfsrelationcommand$$anonfun$run$1$$anonfun$apply$mcv$sp$1.apply(insertintohadoopfsrelationcommand)。scala:143)在org.apache.spark.scheduler.resulttask.runtask(resulttask。scala:70)在org.apache.spark.scheduler.task.run(task。scala:85)在org.apache.spark.executor.executor$taskrunner.run(executor。scala:274)位于java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor。java:1142)在java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor。java:617)在java.lang.thread.run(线程。java:745)
据我们所知,这种情况是通过以下方法发生的:

def process(dataFrame: DataFrame, S3bucket: String) = {
  dataFrame.map(row =>
      "text|label"
  ).coalesce(1).write.mode(SaveMode.Overwrite).text(S3bucket)
}

我们将其缩小到map函数,因为它在作为spark作业提交时起作用:

def process(dataFrame: DataFrame, S3bucket: String) = {
  dataFrame.coalesce(1).write.mode(SaveMode.Overwrite).text(S3bucket)
}

有人知道是什么导致了这个问题吗?还有,我们怎么解决呢?我们被难住了。

ifmq2ha2

ifmq2ha21#

我想你得到了一个 NullPointerException 当工作进程试图访问 SparkContext 只出现在司机身上而不是工人身上的物体。
coalesce()重新划分数据。当您只请求一个分区时,它将尝试压缩一个分区*中的所有数据。这可能会对应用程序的内存占用造成很大压力。
一般来说,最好不要只在1个内存中收缩分区。
有关更多信息,请阅读以下内容:spark nullpointerexception with saveastextfile和this。
如果您不确定分区是什么,我在spark的memoryoverhead一期中向自己解释了它。

相关问题