pyspark udf函数存储不正确的数据,尽管函数产生正确的结果

brccelvz  于 2023-10-15  发布在  Spark
关注(0)|答案(1)|浏览(102)

我有个奇怪的问题。我使用的是一个巨大的数据集,其中包含由单个字符串表示的日期和时间。这个数据可以很容易地使用datetime.strptime()转换,但问题是数据是如此巨大,我需要使用pyspark来转换它。没问题,我想,我搜索了stackoverflow,看到了UDF!所以我做了一个不幸的是,存储在嵌套框架中的值与函数实际生成的值不匹配。我假设一个函数会在spark看到数据时逐行执行,但看起来并没有发生。
下面是我所拥有的(data是一个名为 result 的pyspark嵌套框架,我显示了前5行):
| 时间戳|节点id| Subsys|传感器|par|瓦尔_raw|瓦尔_hrf|
| --|--|--|--|--|--|--|
| 2018 - 01 - 01 00:00:06| 001e0610e532|化学感测|LPS25h| temp|-954| -9.54 |
| 2018 - 01 - 01 00:00:30| 001e0610e532|化学感测|LPS25h| temp|-954| -9.54 |
| 2018 - 01 - 01 00:00:54| 001e0610e532|化学感测|LPS25h| temp|-957| -9.57 |
| 2018 - 01 - 01 00:01:18| 001e0610e532|化学感测|LPS25h| temp|-961| -9.61 |
| 2018 - 01 - 01 00:01:42| 001e0610e532|化学感测|LPS25h| temp|-962| -9.62 |
为了将timestamp列转换为可操作的数据,我使用自定义函数将其转换为浮点数:

def timeswap(x:str):
    print(x)
    utime= datetime.timestamp(datetime.strptime(x, "%Y/%m/%d %H:%M:%S"))
    print(utime)
    return utime

我已确认此功能工作正常。所以我继续在整个列上运行它,并决定创建一个名为unixTime的新列来存储它:
timeUDF = spark.udf.register('timeUDF',timeswap,FloatType()) result_conv = result.withColumn('unixTime', timeUDF('timestamp'))
看上去很有效。我花了几个星期的时间认为这是准确的,对数据运行算法,但最近发现数据以一种不应该的方式聚集;在同一天读了很多书所以我让斯帕克把专栏打印出来。这样做实际上会导致函数对每一行进行调用。我知道这是一件事,所以我把print语句作为一个健全的检查:result_conv.select('unixTime').head(5)
它输出这个# comments由我:

2018/01/01 00:00:06 #The original string date
1514782806.0 #the correct output from the function
2018/01/01 00:00:30
1514782830.0
2018/01/01 00:00:54
1514782854.0
2018/01/01 00:01:18
1514782878.0
2018/01/01 00:01:42
1514782902.0
[Row(unixTime=1514782848.0), #I don't know what this value is
 Row(unixTime=1514782848.0),
 Row(unixTime=1514782848.0),
 Row(unixTime=1514782848.0),
 Row(unixTime=1514782848.0)]

有人知道我错过了什么吗?我甚至已经确认,当运行超过5行时,行列表中的浮点数不存在,所以我不知道这个值是从哪里产生的,也不知道为什么它在各行之间重复。它既不是平均值,也不是中值(无论如何都不应该使用这些值),我不知道为什么它会重复(当我查看较长的行时,重复的数量不一致)。我真的很想避免把这个转换成PandasDF,然后再转换回SparkDF来做这个。底线是我需要将日期字符串转换为一个unixtime浮点数,这是唯一的每行传感器(因为它是在数据)。
感谢您的任何帮助!

nnsrf1az

nnsrf1az1#

您不必使用UDF。你可以使用内置的pyspark函数来完成同样的任务。UDF是最后的手段。它会减慢你的程序。
我是这么做的我认为价值上的微小差异可能是由于时区问题。但我不确定。如果你能更好地说明问题,我可以提供更多帮助。例如:(1514745006 (my computer) - 1514782806 (your computer)) = 37800 seconds = 10.5 hours。所以这意味着你比我的时区早10.5小时。

import pyspark.sql.functions as F
from pyspark import SparkContext, SQLContext

sc = SparkContext('local')
sqlContext = SQLContext(sc)
### This is very important setting if you want legacy behaviour
sqlContext.setConf("spark.sql.legacy.timeParserPolicy", "LEGACY")

data1 = [
    ["2018/01/01 00:00:06"],
    ["2018/01/01 00:00:30"],
    ["2018/01/01 00:00:54"],
    ["2018/01/01 00:01:18"],
    ["2018/01/01 00:01:42"],

]

df1Columns = ["time_col"]
df1 = sqlContext.createDataFrame(data=data1, schema=df1Columns)

# 1514782806.0  # the correct output from the function

df1 = df1.withColumn("integer_value", F.unix_timestamp(F.to_timestamp('time_col', 'yyyy/MM/dd HH:mm:ss')))
df1.show(n=100, truncate=False)

输出量:

+-------------------+-------------+
|time_col           |integer_value|
+-------------------+-------------+
|2018/01/01 00:00:06|1514745006   |
|2018/01/01 00:00:30|1514745030   |
|2018/01/01 00:00:54|1514745054   |
|2018/01/01 00:01:18|1514745078   |
|2018/01/01 00:01:42|1514745102   |
+-------------------+-------------+

相关问题