我有一个spark数据框,其中很少有列具有不同类型的日期格式。
为了处理这个问题,我编写了下面的代码,以保持所有日期列的格式类型一致。
由于date列的日期格式每次都可能发生更改,因此我在中定义了一组日期格式 dt_formats
.
def to_timestamp_multiple(s: Column, formats: Seq[String]): Column = {
coalesce(formats.map(fmt => to_timestamp(s, fmt)):_*)
}
val dt_formats= Seq("dd-MMM-yyyy", "MMM-dd-yyyy", "yyyy-MM-dd","MM/dd/yy","dd-MM-yy","dd-MM-yyyy","yyyy/MM/dd","dd/MM/yyyy")
val newDF = df.withColumn("ETD1", date_format(to_timestamp_multiple($"ETD",Seq("dd-MMM-yyyy", dt_formats)).cast("date"), "yyyy-MM-dd")).drop("ETD").withColumnRenamed("ETD1","ETD")
但这里我必须创建一个新列,然后我必须删除旧列,然后重命名新列。这使得代码不必要非常笨拙,因此我想从这个代码得到覆盖。
我试图通过在下面编写一个scala函数来实现类似的功能,但是它抛出了一个异常 org.apache.spark.sql.catalyst.parser.ParseException:
,但我无法确定我应该做什么样的更改才能使其正常工作。。
val CleansedData= rawDF.selectExpr(rawDF.columns.map(
x => { x match {
case "ETA" => s"""date_format(to_timestamp_multiple($x, dt_formats).cast("date"), "yyyy-MM-dd") as ETA"""
case _ => x
} } ) : _*)
因此寻求帮助。提前谢谢。
2条答案
按热度按时间2uluyalo1#
创建自定义项以便与一起使用
select
. select方法获取列并生成另一个Dataframe。还有,不用
coalesce
,简单地构建一个处理所有格式的解析器可能更简单。可以使用datetimeformatterbuilder进行此操作。输出:
请注意,不可解析的值最终为null,如图所示。
dgenwo3n2#
尝试
withColumn(...)
同名合并如下-