scala将df转换为rdd-error java.lang.numberformatexception:对于输入字符串:“age”

6uxekuva  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(349)

我在scala和spark中做一些小动作,我面临一个我无法解决的错误。
我正在尝试将csv文件Map到df,但返回了一个错误。

// Adding schema to RDDs - Initialization
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.Encoder
import spark.implicits._

case class Employee(name: String, age: Long)
val employeeDF = spark.sparkContext.textFile.("./employee.txt").map(_.split(",")).map(attributes => Employee(attributes(0), attributes(1).trim.toInt)).toDF()
employeeDF.createOrReplaceTempView("employee")

var youngstersDF = spark.sql("SELECT name,age FROM employee WHERE age BETWEEN 18 AND 30")
youngstersDF.map(youngster => "Name: " + youngster(0)).show()

当我尝试Map时,名称返回一个错误,如下所述:
返回的错误是:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 19.0 failed 1 times, most recent failure: Lost task 0.0 in stage 19.0 (TID 21, 192.168.0.122, executor driver): java.lang.NumberFormatException: For input string: "age"

文件内容是:姓名,年龄约翰,28安德鲁,36克拉克,22凯文,42
我在谷歌上搜索了一下,但没有找到解决方案/答案。
有人能帮忙吗?
非常感谢xavy

fslejnso

fslejnso1#

我想试试这个-

def getType[T: scala.reflect.runtime.universe.TypeTag](obj: T) = scala.reflect.runtime.universe.typeOf[T]
    val path = getClass.getResource("/csv/employee.txt").getPath
    val ds = spark.read
      .schema(ScalaReflection.schemaFor[Employee].dataType.asInstanceOf[StructType])
      .option("header", true)
      .option("sep", ",")
      .csv(path)
      .as[Employee]
    println(getType(ds))
    /**
      * org.apache.spark.sql.Dataset[com.som.spark.learning.Employee]
      */
    ds.show(false)
    ds.printSchema()
    /**
      * +------+---+
      * |name  |age|
      * +------+---+
      * |John  |28 |
      * |Andrew|36 |
      * |Clarke|22 |
      * |Kevin |42 |
      * +------+---+
      *
      * root
      * |-- name: string (nullable = true)
      * |-- age: long (nullable = true)
      */
case class Employee(name: String, age: Long)
wko9yo5t

wko9yo5t2#

你需要 filter out the header 在转换为Dataframe时从数据中删除。 Example: ```
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.Encoder
import spark.implicits._

case class Employee(name: String, age: Long)
val employeeRDD = spark.sparkContext.textFile("./employee.txt")

//storing header string
val header=employeeRDD.first()

//filter out the header from the data
val employeeDF = employeeRDD.filter(r => r!=header).map(_.split(",")).map(attributes => Employee(attributes(0), attributes(1).trim.toInt)).toDF()

employeeDF.createOrReplaceTempView("employee")

sql("select * from employee").show()
//+------+---+
//| name|age|
//+------+---+
//| John| 28|
//|Andrew| 36|
//|Clarke| 22|
//| Kevin| 42|
//+------+---+

仅供参考 `spark.read.csv` 并在读取时传递模式。

import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
val sch=new StructType().add("name",StringType).add("age",LongType)
val df=spark.read.option("header",true).option("delimiter",",").schema(sch).csv("./employee.txt")

df.show()
//+------+---+
//| name|age|
//+------+---+
//| John| 28|
//|Andrew| 36|
//|Clarke| 22|
//| Kevin| 42|
//+------+---+

相关问题