向timestamp列添加天数

3z6pesqy  于 2021-07-14  发布在  Spark
关注(0)|答案(2)|浏览(314)

我试图在timestamp列中添加天,但是在添加天之后,我丢失了原始timestamp列中的小时数。关于如何在最终输出中保留小时数有什么建议吗?

var Df = Seq(
(1, "2020-04-01 14:31:10",0),
(2, "2020-04-15 14:31:10",-1),
(3, "2020-04-16 03:31:10",3),
(6, "2020-03-01 14:31:10",30)    
 ).toDF("Id", "startDt", "Flag")

Df = (Df.withColumn("startDt",to_timestamp($"startDt")))

(Df.withColumn("newdate",when($"Flag".isNotNull && $"Flag" > 0 && $"Flag" <= 30
        && $"startDt".isNotNull, expr("date_add(startDt,Flag)").cast(TimestampType)).otherwise(lit(null))
                  )
).show()

预期结果:

+---+-------------------+----+-------------------+
| Id|            startDt|Flag|            newdate|
+---+-------------------+----+-------------------+
|  1|2020-04-01 14:31:10|   0|               null|
|  2|2020-04-15 14:31:10|  -1|               null|
|  3|2020-04-16 03:31:10|   3|2020-04-19 03:31:10|
|  6|2020-03-01 14:31:10|  30|2020-03-31 14:30:10|

相反,给定结果:

+---+-------------------+----+-------------------+
| Id|            startDt|Flag|            newdate|
+---+-------------------+----+-------------------+
|  1|2020-04-01 14:31:10|   0|               null|
|  2|2020-04-15 14:31:10|  -1|               null|
|  3|2020-04-16 03:31:10|   3|2020-04-19 00:00:00|
|  6|2020-03-01 14:31:10|  30|2020-03-31 00:00:00|

我可以使用自定义项获得所需的输出,但是否有任何内置函数来实现相同的输出?

val AddDaysToTimeStamp = udf((x: java.sql.Timestamp, y: Int) => {val result = new Timestamp(x.getTime() + TimeUnit.DAYS.toMillis(y)) result})
xmakbtuz

xmakbtuz1#

在添加天之后,我失去了原始时间戳列中的小时数
那是因为 date_add 总是回来 DateType (而不是 TimestampType ),即使它将任何一种类型作为输入。
如果不喜欢使用自定义项,可以使用spark api的几种方法:
方法1:转换到unix epoch时间并在秒内修改(*)

Df.
  withColumn("startDt", to_timestamp($"startDt")).
  withColumn(
    "newdate",
    when($"Flag".isNotNull && $"Flag" > 0 && $"Flag" <= 30 && $"startDt".isNotNull,
      to_timestamp(unix_timestamp($"startDt") + $"Flag" * 3600 * 24)
    )
  ).show
// +---+-------------------+----+-------------------+
// | Id|            startDt|Flag|            newdate|
// +---+-------------------+----+-------------------+
// |  1|2020-04-01 14:31:10|   0|               null|
// |  2|2020-04-15 14:31:10|  -1|               null|
// |  3|2020-04-16 03:31:10|   3|2020-04-19 03:31:10|
// |  6|2020-03-01 14:31:10|  30|2020-03-31 15:31:10|  <-- +1 hour due to daylight savings
// +---+-------------------+----+-------------------+

(*)如果加/减天数的周期与夏时制时间交叉,则小时值将移动。
方法2:拆分datetime字符串并分别修改date和time

Df.
  withColumn("splitTS", split($"startDt", "\\s+")).
  withColumn(
    "newdate",
    when($"Flag".isNotNull && $"Flag" > 0 && $"Flag" <= 30,
      concat(expr("date_add(to_date(splitTS[0]), Flag)"), lit(" "), $"splitTS"(1))
    )
  ).show
// +---+-------------------+----+--------------------+-------------------+
// | Id|            startDt|Flag|             splitTS|            newdate|
// +---+-------------------+----+--------------------+-------------------+
// |  1|2020-04-01 14:31:10|   0|[2020-04-01, 14:3...|               null|
// |  2|2020-04-15 14:31:10|  -1|[2020-04-15, 14:3...|               null|
// |  3|2020-04-16 03:31:10|   3|[2020-04-16, 03:3...|2020-04-19 03:31:10|
// |  6|2020-03-01 14:31:10|  30|[2020-03-01, 14:3...|2020-03-31 14:31:10|
// +---+-------------------+----+--------------------+-------------------+

请注意 startDt 如果需要,可以添加字符串(例如通过regex时间戳模式匹配)。

bakd9h0s

bakd9h0s2#

您可以向时间戳添加一个间隔,其中间隔是由 flag 列:

import org.apache.spark.sql.types._

val df2 = Df.withColumn(
    "newdate", 
    when(
        $"Flag".isNotNull && $"Flag" > 0 && $"Flag" <= 30 && $"startDt".isNotNull, 
        $"startDt" + concat($"flag".cast("string"), lit(" days")).cast(CalendarIntervalType)
    )
)

df2.show
+---+-------------------+----+-------------------+
| Id|            startDt|Flag|            newdate|
+---+-------------------+----+-------------------+
|  1|2020-04-01 14:31:10|   0|               null|
|  2|2020-04-15 14:31:10|  -1|               null|
|  3|2020-04-16 03:31:10|   3|2020-04-19 03:31:10|
|  6|2020-03-01 14:31:10|  30|2020-03-31 14:31:10|
+---+-------------------+----+-------------------+

相关问题