无法使用spark scala for csv文件显示Dataframe记录

ni65a41a  于 2021-05-29  发布在  Hadoop
关注(0)|答案(2)|浏览(436)

我创造了 dataframe 通过转换 RDDDF 使用map函数。当我试图显示记录时,它给了我 exception .
下面是我的代码:

//Created case class
case class employees(emp_id:java.lang.Long,emp_name:String, job_name:String,manager_id:java.lang.Long,hire_date:String,salary:java.lang.Double,commision:java.lang.Double,dep_id:java.lang.Long);

// Created DF
val employeesDf=rd1.map(_.split(",")).map(p=>employees(p(0).toLong,p(1),p(2),p(3).toLong,p(4),p(5).toDouble,p(6).toDouble,p(7).toLong)).toDF()

//Schema    
scala> employeesDf
    res5: org.apache.spark.sql.DataFrame = [emp_id: bigint, emp_name: string, job_name: string, manager_id: bigint, hire_date: string, salary: double, commision: double, dep_id: bigint]

但是当我试图显示一些记录时,它抛出了一个异常。下面是 error :

scala> employeesDf.show()
18/08/05 07:08:43 ERROR executor.Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.lang.NumberFormatException: For input string: ""

以下是员工的数据集:
数据集图像
那我哪里做错了??我被困了好几个小时。。

vshtjzan

vshtjzan1#

你的 dataframe's 某些列包含 empty strings 你在试图 parsing 让他们 double , long . 所以要么改变这些 columnstrings 以防分类或使用 if-else 根据您的业务需求分析时的条件。就像下面一样

//Sample data in test.txt

65646,JONAS,MANAGER,68319,1991-04-02,2957.00,,2001
64989,ADELYN,SALESMAN,66928,1991-02-20,1700.00,400.00,3001

val rdd=sc.textFile("C:\\spark\\programs\\test.txt").filter(p=>{p!=null && p.trim.length>0})

// Created DF
rdd.map(_.split(",")).map(p=>employees(p(0).toLong,p(1),p(2),if(p(3).length>0) p(3).toLong else 0,p(4),if(p(5).length>0) p(5).toDouble else 0, if(p(6).length>0) p(6).toDouble else 0,p(7).toLong)).toDF().show

//Output: 

+------+--------+--------+----------+----------+------+---------+------+
|emp_id|emp_name|job_name|manager_id| hire_date|salary|commision|dep_id|
+------+--------+--------+----------+----------+------+---------+------+
| 65646|   JONAS| MANAGER|     68319|1991-04-02|2957.0|      0.0|  2001|
| 64989|  ADELYN|SALESMAN|     66928|1991-02-20|1700.0|    400.0|  2001|
+------+--------+--------+----------+----------+------+---------+------+
anauzrmj

anauzrmj2#

我通过创建自定义项并在map函数中使用它来解决我的问题。以下是代码:

//Create case class for schema :

case class employees(emp_id:java.lang.Long,emp_name:String, job_name:String,manager_id:java.lang.Long,hire_date:String,salary:java.lang.Double,commision:java.lang.Double,dep_id:java.lang.Long);

// Create UDF’s for Long and double :

def getDoubleValue(value:String):Double= {
  val output:Double=if (value != null && value.trim.length>0) {
    value.toDouble
  }else{
    0D
  }
  output
}
def getLongValue(value:String):Long= {
  val output:Long=if (value != null && value.trim.length>0) {
    value.toLong
  }else{
    0L
  }
  output
}

// Create RDD

val rdd=sc.textFile("file:////home/hduser/Desktop/Employees/employees.txt").filter(p=>{p!=null && p.trim.length>0})

// Create DF

val df=rdd.map(_.split(",")).map(p=>employees(getLongValue(p(0)),p(1),p(2),getLongValue(p(3)),p(4),getDoubleValue(p(5)),getDoubleValue(p(6)),getLongValue(p(7)))).toDF()

// Display records:

df.show();

相关问题