在spark数据框中为多个spark列更新日期格式

nwlqm0z1  于 2021-05-29  发布在  Spark
关注(0)|答案(2)|浏览(474)

我有一个spark数据框,其中很少有列具有不同类型的日期格式。
为了处理这个问题,我编写了下面的代码,以保持所有日期列的格式类型一致。
由于date列的日期格式每次都可能发生更改,因此我在中定义了一组日期格式 dt_formats .

def to_timestamp_multiple(s: Column, formats: Seq[String]): Column = {
    coalesce(formats.map(fmt => to_timestamp(s, fmt)):_*)
}

val dt_formats= Seq("dd-MMM-yyyy", "MMM-dd-yyyy", "yyyy-MM-dd","MM/dd/yy","dd-MM-yy","dd-MM-yyyy","yyyy/MM/dd","dd/MM/yyyy")

val newDF =  df.withColumn("ETD1", date_format(to_timestamp_multiple($"ETD",Seq("dd-MMM-yyyy", dt_formats)).cast("date"), "yyyy-MM-dd")).drop("ETD").withColumnRenamed("ETD1","ETD")

但这里我必须创建一个新列,然后我必须删除旧列,然后重命名新列。这使得代码不必要非常笨拙,因此我想从这个代码得到覆盖。
我试图通过在下面编写一个scala函数来实现类似的功能,但是它抛出了一个异常 org.apache.spark.sql.catalyst.parser.ParseException: ,但我无法确定我应该做什么样的更改才能使其正常工作。。

val CleansedData= rawDF.selectExpr(rawDF.columns.map( 
x => { x match {
  case "ETA" => s"""date_format(to_timestamp_multiple($x, dt_formats).cast("date"), "yyyy-MM-dd") as ETA"""
  case _ => x
}  }   ) : _*)

因此寻求帮助。提前谢谢。

2uluyalo

2uluyalo1#

创建自定义项以便与一起使用 select . select方法获取列并生成另一个Dataframe。
还有,不用 coalesce ,简单地构建一个处理所有格式的解析器可能更简单。可以使用datetimeformatterbuilder进行此操作。

import java.time.format.DateTimeFormatter
import java.time.format.DateTimeFormatterBuilder
import org.apache.spark.sql.functions.udf
import java.time.LocalDate
import scala.util.Try
import java.sql.Date

val dtFormatStrings:Seq[String] = Seq("dd-MMM-yyyy", "MMM-dd-yyyy", "yyyy-MM-dd","MM/dd/yy","dd-MM-yy","dd-MM-yyyy","yyyy/MM/dd","dd/MM/yyyy")

// use foldLeft with appendOptional method, which for each format,
// returns a new builder with that additional possible format

val initBuilder = new DateTimeFormatterBuilder()
val builder: DateTimeFormatterBuilder = dtFormatStrings.foldLeft(initBuilder)(
  (b: DateTimeFormatterBuilder, s:String) => b.appendOptional(DateTimeFormatter.ofPattern(s)))
val formatter = builder.toFormatter()

// Create the UDF, which just takes
// any function returning a sql-compatible type (java.sql.Date, here)

def toTimeStamp2(dateString:String): Date = {
  val dateTry: Try[Date] = Try(java.sql.Date.valueOf(LocalDate.parse(dateString, formatter)))
  dateTry.toOption.getOrElse(null)
}

val timeConversionUdf = udf(toTimeStamp2 _)

// example DF and new DF
val df = Seq(("05/08/20"), ("2020-04-03"), ("unparseable")).toDF("ETD")
df.select(timeConversionUdf(col("ETD"))).toDF("ETD2").show

输出:

+----------+
|      ETD2|
+----------+
|2020-05-08|
|2020-04-03|
|      null|
+----------+

请注意,不可解析的值最终为null,如图所示。

dgenwo3n

dgenwo3n2#

尝试 withColumn(...) 同名合并如下-

val dt_formats= Seq("dd-MMM-yyyy", "MMM-dd-yyyy", "yyyy-MM-dd","MM/dd/yy","dd-MM-yy","dd-MM-yyyy","yyyy/MM/dd","dd/MM/yyyy")

val newDF =  df.withColumn("ETD", coalesce(dt_formats.map(fmt => to_date($"ETD", fmt)):_*))

相关问题