我遵循spark中的一个基本教程,其中要求我们将一个.csv文件导入数据库,执行查询并收集查询结果。
为此,我们首先创建一个persona case类来包含数据,并创建一个函数来将数据集的每一行格式化为persona对象,如下所示:
case class Persona(ID:Int,nombre:String,edad:Int,numeroAmigos:Int)
def procesarPersona(linea:String):Persona = {
val campos = linea.split(",")
val persona = Persona(campos(0).toInt,campos(1),campos(2).toInt,campos(3).toInt)
return persona
}
然后,我们创建spark sql会话并从.csv文件导入数据,如下所示:
val spark = SparkSession.builder.appName("Personas").getOrCreate()
val lineas = spark.sparkContext.textFile("file:///home/Eduardo/Downloads/friends.csv")
然后我们用函数Map每一行以处理每一行:
val personas = lineas.map(procesarPersona) //DATAFRAME RESULTADO
接下来,我们使用以下命令将dataframe转换为sql数据库:
val estructuraDatos = personas.toDS
estructuraDatos.printSchema
estructuraDatos.createOrReplaceTempView("personas")
然后我执行sql查询,并尝试收集数据,
val mayoresEdad = spark.sql("SELECT * FROM personas WHERE edad >= 18")
val resultados = mayoresEdad.collect()
问题是,前面的所有步骤都与视频中显示的结果相匹配,但是,在执行查询之后,我无法收集结果而不产生以下错误。
我得到的错误如下:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 1 times, most recent failure: Lost task 0.0 in stage 4.0 (TID 8, 192.168.1.43, executor driver): java.lang.ClassCastException: class Persona cannot be cast to class Persona (Persona is in unnamed module of loader org.apache.spark.repl.ExecutorClassLoader @5337f0dc; Persona is in unnamed module of loader scala.tools.nsc.interpreter.IMain$TranslatingClassLoader @2229c7c)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:462)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
我已经检查了我的spark版本,而且我是正确的,我可能不需要我的教授在mac上使用以前的scala版本,但我怀疑这是原因。
你知道错误是什么吗?
我使用的是spark 3.0.2、Ubuntu20.04和zeppelin 0.8.9-bin-all。
谢谢你的帮助!
暂无答案!
目前还没有任何答案,快来回答吧!