我有一个字串 val current_dates = "A:2021-04-02,B:2021-04-02,C:2021-04-01,D:2021-04-01"
. 在这里 A,B,C,D
是id字段及其对应的日期。
现在我有了一个输入Dataframe,其中有多条记录的id&date列。
val input_df = sc.parallelize(Seq(("A","2021-04-01"),("A","2021-04-02"),("B","2021-04-01"),("B","2021-04-02"),("C","2021-04-01"),("C","2021-04-02"),("D","2021-04-01"),("D","2021-04-02"))).toDF("id","create_date")
input_df.show()
+---+-----------+
| id|create_date|
+---+-----------+
| A| 2021-04-01|
| A| 2021-04-02|
| B| 2021-04-01|
| B| 2021-04-02|
| C| 2021-04-01|
| C| 2021-04-02|
| D| 2021-04-01|
| D| 2021-04-02|
+---+-----------+
现在,我想将每个记录的日期值与字符串中每个id的对应日期进行比较,并在dataframe中派生新的date列。
expected_df.select((input_df.columns ++ Array("new_dt")).head, (input_df.columns ++ Array("new_dt")).tail: _*).orderBy("id").show()
+---+-----------+----------+
| id|create_date| new_dt|
+---+-----------+----------+
| A| 2021-04-01|2021-04-02|
| A| 2021-04-02|2021-04-02|
| B| 2021-04-02|2021-04-02|
| B| 2021-04-01|2021-04-02|
| C| 2021-04-02|2021-04-02|
| C| 2021-04-01|2021-04-01|
| D| 2021-04-01|2021-04-01|
| D| 2021-04-02|2021-04-02|
+---+-----------+----------+
目前,我正在将字符串转换为另一个Dataframe,并将其与输入Dataframe连接起来,然后以下面的方式导出新列。
val current_dates_df = sc.parallelize(current_dates.split(",").map(_.split(":")).map{ case Array(a,b) => (a, b) }).toDF("previous_run_id", "previous_run_date")
current_dates_df.show()
+---------------+-----------------+
|previous_run_id|previous_run_date|
+---------------+-----------------+
| A| 2021-04-02|
| B| 2021-04-02|
| C| 2021-04-01|
| D| 2021-04-01|
+---------------+-----------------+
val deriveNewDt: UserDefinedFunction = udf[String, String, String]((create_date: String, previous_run_date: String) => {
val date_format: String = "yyyy-MM-dd"
val new_dt = {
if (new SimpleDateFormat(date_format).parse(create_date).after(new SimpleDateFormat(date_format).parse(previous_run_date))) create_date
else previous_run_date
}
new_dt
})
val joined_df = input_df.join(current_dates_df, input_df("id") === current_dates_df("previous_run_id"), "left_outer")
val expected_df = joined_df.withColumn("new_dt", deriveNewDt($"create_date", $"previous_run_date"))
expected_df.select((input_df.columns ++ Array("new_dt")).head, (input_df.columns ++ Array("new_dt")).tail: _*).show()
有没有更好的方法来处理字符串并在不将字符串转换为Dataframe的情况下实现相同的功能。
1条答案
按热度按时间whlutmcx1#
你可以用
str_to_map
得到一个给定的日期id
,并使用greatest
要获得两者之间的较晚日期: