转换字符串并与df列值spark scala进行比较

nafvub8i  于 2021-07-09  发布在  Spark
关注(0)|答案(1)|浏览(282)

我有一个字串 val current_dates = "A:2021-04-02,B:2021-04-02,C:2021-04-01,D:2021-04-01" . 在这里 A,B,C,D 是id字段及其对应的日期。
现在我有了一个输入Dataframe,其中有多条记录的id&date列。

val input_df = sc.parallelize(Seq(("A","2021-04-01"),("A","2021-04-02"),("B","2021-04-01"),("B","2021-04-02"),("C","2021-04-01"),("C","2021-04-02"),("D","2021-04-01"),("D","2021-04-02"))).toDF("id","create_date")

input_df.show()

+---+-----------+
| id|create_date|
+---+-----------+
|  A| 2021-04-01|
|  A| 2021-04-02|
|  B| 2021-04-01|
|  B| 2021-04-02|
|  C| 2021-04-01|
|  C| 2021-04-02|
|  D| 2021-04-01|
|  D| 2021-04-02|
+---+-----------+

现在,我想将每个记录的日期值与字符串中每个id的对应日期进行比较,并在dataframe中派生新的date列。

expected_df.select((input_df.columns ++ Array("new_dt")).head, (input_df.columns ++ Array("new_dt")).tail: _*).orderBy("id").show()
+---+-----------+----------+
| id|create_date|    new_dt|
+---+-----------+----------+
|  A| 2021-04-01|2021-04-02|
|  A| 2021-04-02|2021-04-02|
|  B| 2021-04-02|2021-04-02|
|  B| 2021-04-01|2021-04-02|
|  C| 2021-04-02|2021-04-02|
|  C| 2021-04-01|2021-04-01|
|  D| 2021-04-01|2021-04-01|
|  D| 2021-04-02|2021-04-02|
+---+-----------+----------+

目前,我正在将字符串转换为另一个Dataframe,并将其与输入Dataframe连接起来,然后以下面的方式导出新列。

val current_dates_df = sc.parallelize(current_dates.split(",").map(_.split(":")).map{ case Array(a,b) => (a, b) }).toDF("previous_run_id", "previous_run_date")    

current_dates_df.show()   

+---------------+-----------------+
|previous_run_id|previous_run_date|
+---------------+-----------------+
|              A|       2021-04-02|
|              B|       2021-04-02|
|              C|       2021-04-01|
|              D|       2021-04-01|
+---------------+-----------------+ 

val deriveNewDt: UserDefinedFunction = udf[String, String, String]((create_date: String, previous_run_date: String) => {
    val date_format: String = "yyyy-MM-dd"
    val new_dt = {
        if (new SimpleDateFormat(date_format).parse(create_date).after(new SimpleDateFormat(date_format).parse(previous_run_date))) create_date 
        else previous_run_date
    }
    new_dt
})    

val joined_df = input_df.join(current_dates_df, input_df("id") === current_dates_df("previous_run_id"), "left_outer")    

val expected_df = joined_df.withColumn("new_dt", deriveNewDt($"create_date", $"previous_run_date"))    

expected_df.select((input_df.columns ++ Array("new_dt")).head, (input_df.columns ++ Array("new_dt")).tail: _*).show()

有没有更好的方法来处理字符串并在不将字符串转换为Dataframe的情况下实现相同的功能。

whlutmcx

whlutmcx1#

你可以用 str_to_map 得到一个给定的日期 id ,并使用 greatest 要获得两者之间的较晚日期:

val current_dates = "A:2021-04-02,B:2021-04-02,C:2021-04-01,D:2021-04-01"

val result = input_df.withColumn(
    "new_dt", 
    expr(s"greatest(str_to_map('$current_dates, ',', ':')[id], create_date)")
)

result.show
+---+-----------+----------+
| id|create_date|    new_dt|
+---+-----------+----------+
|  A| 2021-04-01|2021-04-02|
|  A| 2021-04-02|2021-04-02|
|  B| 2021-04-01|2021-04-02|
|  B| 2021-04-02|2021-04-02|
|  C| 2021-04-01|2021-04-01|
|  C| 2021-04-02|2021-04-02|
|  D| 2021-04-01|2021-04-01|
|  D| 2021-04-02|2021-04-02|
+---+-----------+----------+

相关问题