spark scala rdd distinct nullpointerexception异常

whitzsjs  于 2021-05-16  发布在  Spark
关注(0)|答案(2)|浏览(612)

我正在用spark做一些小步骤,我的练习将一个json文件加载到rdd中,选择一列,然后使用distinct来获得唯一的值。我筛选的列包含多个值(csv行),必须拆分。

val sqlContext = spark.sqlContext
import org.apache.spark.sql.hive.HiveContext
val hiveCtx = new HiveContext(sc)
import hiveCtx.implicits._
val bizDF = hiveCtx.jsonFile("/home/xpto/Documents/PersonalProjects/Yelp_P1/yelp_academic_dataset_business.json")
val catRdd = bizDF.select("categories").rdd.flatMap(row => (row.getString(0).split(",").map(_.trim))).distinct

当我运行语句“catrdd.take(10).foreach(println)”时,返回一个异常:
org.apache.spark.sparkexception:作业因阶段失败而中止:阶段234.0中的任务0失败1次,最近的失败:阶段234.0中的任务0.0丢失(tid 682,192.168.0.122,executor driver):

java.lang.NullPointerException
    at $anonfun$catRdd$1(<console>:39)
    at $anonfun$catRdd$1$adapted(<console>:39)
    at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192)
    at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
    at org.apache.spark.scheduler.Task.run(Task.scala:127)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
  at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
  at scala.Option.foreach(Option.scala:407)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2120)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2139)
  at org.apache.spark.rdd.RDD.$anonfun$take$1(RDD.scala:1423)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
  at org.apache.spark.rdd.RDD.take(RDD.scala:1396)
  ... 48 elided
Caused by: java.lang.NullPointerException
  at $anonfun$catRdd$1(<console>:39)
  at $anonfun$catRdd$1$adapted(<console>:39)
  at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
  at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
  at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192)
  at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
  at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
  at org.apache.spark.scheduler.Task.run(Task.scala:127)
  at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
  ... 3 more

我运行的spark版本是2.12-3.0.1

toe95027

toe950271#

json文件有多行,hivectx不支持。使用spark会话尝试以下操作:

val bizDF = spark.read.format("json").option("multiline", "true").load("/home/xpto/Documents/PersonalProjects/Yelp_P1/yelp_academic_dataset_business.json")
val catRdd = bizDF.select("categories").rdd.flatMap(row => (row.getString(0).split(",").map(_.trim))).distinct
catRdd.take(10).foreach(println)
7rfyedvj

7rfyedvj2#

我找到了一个符合我要求的解决方案:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val v1 = bizDF.withColumn("categories", split(col("categories"), ","))
  .select(col("categories")(0).as("description"))
  .distinct
  .coalesce(1)
  .orderBy(asc("description"))

val windowSpec = Window.orderBy("description")
val v2 = v1.withColumn("id",row_number.over(windowSpec))
val v3 = v2.select("id","description")

相关问题