小于spark scala rdd中日期的比较

xsuvu9jc  于 2021-06-01  发布在  Hadoop
关注(0)|答案(2)|浏览(700)

我想打印1991年以前加入的员工的数据。以下是我的样本数据:

69062,FRANK,ANALYST,5646,1991-12-03,3100.00,,2001
63679,SANDRINE,CLERK,69062,1990-12-18,900.00,,2001

加载数据的初始rdd:

val rdd=sc.textFile("file:////home/hduser/Desktop/Employees/employees.txt").filter(p=>{p!=null && p.trim.length>0})

用于将字符串列转换为日期列的自定义项:

def convertStringToDate(s: String): Date = {
        val dateFormat = new SimpleDateFormat("yyyy-MM-dd")
        dateFormat.parse(s)
    }

将每个列Map到其数据类型:

val dateRdd=rdd.map(_.split(",")).map(p=>(if(p(0).length >0 )p(0).toLong else 0L,p(1),p(2),if(p(3).length > 0)p(3).toLong else 0L,convertStringToDate(p(4)),if(p(5).length >0)p(5).toDouble else 0D,if(p(6).length > 0)p(6).toDouble else 0D,if(p(7).length> 0)p(7).toInt else 0))

现在我得到元组中的数据,如下所示:

(69062,FRANK,ANALYST,5646,Tue Dec 03 00:00:00 IST 1991,3100.0,0.0,2001)
(63679,SANDRINE,CLERK,69062,Tue Dec 18 00:00:00 IST 1990,900.0,0.0,2001)

现在,当我执行命令时,我得到以下错误:

scala> dateRdd.map(p=>(!(p._5.before("1991")))).foreach(println)
<console>:36: error: type mismatch;
 found   : String("1991")
 required: java.util.Date
              dateRdd.map(p=>(!(p._5.before("1991")))).foreach(println)

                                        ^

那我哪里做错了???

shyt4zoc

shyt4zoc1#

由于您使用的是rdd,而不是df,并且您有带有简单日期检查的日期字符串,因此以下是rdd的非复杂方式:

val rdd = sc.parallelize(Seq((69062,"FRANK","ANALYST",5646, "1991-12-03",3100.00,2001),(63679,"SANDRINE","CLERK",69062,"1990-12-18",900.00,2001)))
rdd.filter(p=>(p._5 < "1991-01-01")).foreach(println)
gg0vcinb

gg0vcinb2#

无需将日期转换为传统的simpledate格式。使用java.time。因为第4列是iso期望的格式,所以您可以简单地使用下面的rdd步骤。看看这个

val rdd=spark.sparkContext.textFile("in\\employees.txt").filter( x => {val y = x.split(","); java.time.LocalDate.parse(y(4)).isBefore(java.time.LocalDate.parse("1991-01-01")) } )

这个

rdd.collect.foreach(println)

给出了以下结果

63679,SANDRINE,CLERK,69062,1990-12-18,900.00,,2001

希望这能回答你的问题。
编辑1:
使用Java7和simpleformat库

import java.util.Date
import java.text.SimpleDateFormat
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark._
import org.apache.spark.sql.types._
import org.apache.spark.sql._
object DTCheck{
  def main(args:Array[String]): Unit = {

    def convertStringToDate(s: String): Date = {
      val dateFormat = new SimpleDateFormat("yyyy-MM-dd")
      dateFormat.parse(s)
    }
    Logger.getLogger("org").setLevel(Level.ERROR)
    val spark = SparkSession.builder().appName("Employee < 1991").master("local[*]").getOrCreate()

    val  sdf = new SimpleDateFormat("yyyy-MM-dd")
    val dt_1991 = sdf.parse("1991-01-01")

    import spark.implicits._
    val rdd=spark.sparkContext.textFile("in\\employees.txt").filter( x => {val y = x.split(","); convertStringToDate(y(4)).before(dt_1991 ) } )
    rdd.collect.foreach(println)
  }
}

相关问题