pyspark to_timestamp不包括毫秒

ebdffaop  于 2023-05-28  发布在  Spark
关注(0)|答案(4)|浏览(100)

我尝试格式化时间戳列以包含毫秒,但没有成功。如何将我的时间格式化为这样-2019-01-04 11:09:21.152
我已经看过文档并遵循了SimpleDataTimeFormat,pyspark文档说to_timestamp函数正在使用它。
这是我的数据框。

+--------------------------+
|updated_date              |
+--------------------------+
|2019-01-04 11:09:21.152815|
+--------------------------+

我使用毫秒格式没有任何成功如下

>>> df.select('updated_date').withColumn("updated_date_col2", 
to_timestamp("updated_date", "YYYY-MM-dd HH:mm:ss:SSS")).show(1,False)
+--------------------------+-------------------+
|updated_date              |updated_date_col2  |
+--------------------------+-------------------+
|2019-01-04 11:09:21.152815|2019-01-04 11:09:21|
+--------------------------+-------------------+

我希望将updated_date_col2化为2019-01-04 11:09:21.152

qrjkbowd

qrjkbowd1#

我认为你可以使用UDF和Python的标准datetime模块如下。

import datetime
from pyspark.sql.functions import udf
from pyspark.sql.types import TimestampType

def _to_timestamp(s):
    return datetime.datetime.strptime(s, '%Y-%m-%d %H:%M:%S.%f')

udf_to_timestamp = udf(_to_timestamp, TimestampType())

df.select('updated_date').withColumn("updated_date_col2", udf_to_timestamp("updated_date")).show(1,False)
g52tjvyc

g52tjvyc2#

这不是使用to_timestamp的解决方案,但您可以轻松地将列保持为时间格式
下面的代码是将数字毫秒转换为时间戳的示例之一。

from datetime import datetime

ms = datetime.now().timestamp() # ex) ms = 1547521021.83301
df = spark.createDataFrame([(1, ms)], ['obs', 'time'])
df = df.withColumn('time', df.time.cast("timestamp"))
df.show(1, False) 

+---+--------------------------+
|obs|time                      |
+---+--------------------------+
|1  |2019-01-15 12:15:49.565263|
+---+--------------------------+

如果在JS中使用new Date().getTime()Date.now(),或者在Python中使用datetime.datetime.now().timestamp(),则可以获得数字毫秒。

to94eoyn

to94eoyn3#

原因是pyspark to_timestamp只解析到秒,而TimestampType能够保存毫秒。
以下解决方法可能有效:
如果时间戳模式包含S,则调用UDF以获取要在表达式中使用的字符串'INTERVAL MILLISECONDS'

ts_pattern = "YYYY-MM-dd HH:mm:ss:SSS"
my_col_name = "time_with_ms"

# get the time till seconds
df = df.withColumn(my_col_name, to_timestamp(df["updated_date_col2"],ts_pattern))

# add milliseconds as inteval
if 'S' in timestamp_pattern:
   df = df.withColumn(my_col_name, df[my_col_name] + expr("INTERVAL 256 MILLISECONDS"))

要获得间隔256毫秒,我们可以使用Java UDF:

df = df.withColumn(col_name, df[col_name] + expr(getIntervalStringUDF(df[my_col_name], ts_pattern)))

在UDF内部:getIntervalStringUDF(String timeString,String pattern)
1.使用SimpleDateFormat根据模式解析日期
1.使用模式“'INTERVAL'SSS'MILLISECONDS'”将格式化日期返回为字符串
1.解析/格式化异常时返回'INTERVAL 0 MILLISECONDS'

bihw5rsg

bihw5rsg4#

你可以直接施放力场,它会有毫秒。

df = df.withColumn('datetime', col('timestamp').cast(TimestampType()))

相关问题