pyspark更改日期在datetime列中

omqzjyyz  于 2023-03-01  发布在  Spark
关注(0)|答案(3)|浏览(153)

尝试更改日期时间列的日期的代码有什么问题

import pyspark
import pyspark.sql.functions as sf
import pyspark.sql.types as sparktypes
import datetime

sc = pyspark.SparkContext(appName="test")
sqlcontext = pyspark.SQLContext(sc)

rdd = sc.parallelize([('a',datetime.datetime(2014, 1, 9, 0, 0)),
                      ('b',datetime.datetime(2014, 1, 27, 0, 0)),
                      ('c',datetime.datetime(2014, 1, 31, 0, 0))])
testdf = sqlcontext.createDataFrame(rdd, ["id", "date"])

print(testdf.show())
print(testdf.printSchema())

给出测试 Dataframe :

+---+--------------------+
| id|                date|
+---+--------------------+
|  a|2014-01-09 00:00:...|
|  b|2014-01-27 00:00:...|
|  c|2014-01-31 00:00:...|
+---+--------------------+

root
 |-- id: string (nullable = true)
 |-- date: timestamp (nullable = true)

然后我定义一个udf来更改日期列的日期:

def change_day_(date, day):
    return date.replace(day=day)

change_day = sf.udf(change_day_, sparktypes.TimestampType())
testdf.withColumn("PaidMonth", change_day(testdf.date, 1)).show(1)

这将引发错误:

Py4JError: An error occurred while calling z:org.apache.spark.sql.functions.col. Trace:
py4j.Py4JException: Method col([class java.lang.Integer]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:339)
    at py4j.Gateway.invoke(Gateway.java:274)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:745)
ikfrs5lh

ikfrs5lh1#

接收多个参数的udf被认为接收多个。“1”不是列。
这意味着您可以执行以下操作之一。或者按照注解中的建议将其设置为列:

testdf.withColumn("PaidMonth", change_day(testdf.date, lit(1))).show(1)

lit(1)是一列1
或者使原始函数返回高阶函数:

def change_day_(day):
    return lambda date: date.replace(day=day)

change_day = sf.udf(change_day_(1), sparktypes.TimestampType())
testdf.withColumn("PaidMonth", change_day(testdf.date)).show(1)

这基本上创建了一个用1替换的函数,因此可以接收一个整数。udf将应用于单个列。

6jjcrrmo

6jjcrrmo2#

感谢@ArthurTacca的评论,诀窍是这样使用pyspark.sql.functions.lit()函数:

testdf.withColumn("PaidMonth", change_day(testdf.date, sf.lit(1))).show()

欢迎选择答案!

o0lyfsai

o0lyfsai3#

我知道这有点晚了,但是如果其他人遇到这个问题,如果你喜欢内置函数,你可以使用date_trunc()沿着date_add()“来改变日期时间列的日期”。

# change to 1 as in the original question
testdf.withColumn("PaidMonth", f.date_trunc("mon", col("date")))
# change to another day eg. 6
testdf.withColumn("PaidMonth", f.date_add(f.date_trunc("mon", col("date")), 5))

last_day()date_sub()也可用于日期操作

相关问题