我有个奇怪的问题。我使用的是一个巨大的数据集,其中包含由单个字符串表示的日期和时间。这个数据可以很容易地使用datetime.strptime()
转换,但问题是数据是如此巨大,我需要使用pyspark来转换它。没问题,我想,我搜索了stackoverflow,看到了UDF!所以我做了一个不幸的是,存储在嵌套框架中的值与函数实际生成的值不匹配。我假设一个函数会在spark看到数据时逐行执行,但看起来并没有发生。
下面是我所拥有的(data是一个名为 result 的pyspark嵌套框架,我显示了前5行):
| 时间戳|节点id| Subsys|传感器|par|瓦尔_raw|瓦尔_hrf|
| --|--|--|--|--|--|--|
| 2018 - 01 - 01 00:00:06| 001e0610e532|化学感测|LPS25h| temp|-954| -9.54 |
| 2018 - 01 - 01 00:00:30| 001e0610e532|化学感测|LPS25h| temp|-954| -9.54 |
| 2018 - 01 - 01 00:00:54| 001e0610e532|化学感测|LPS25h| temp|-957| -9.57 |
| 2018 - 01 - 01 00:01:18| 001e0610e532|化学感测|LPS25h| temp|-961| -9.61 |
| 2018 - 01 - 01 00:01:42| 001e0610e532|化学感测|LPS25h| temp|-962| -9.62 |
为了将timestamp列转换为可操作的数据,我使用自定义函数将其转换为浮点数:
def timeswap(x:str):
print(x)
utime= datetime.timestamp(datetime.strptime(x, "%Y/%m/%d %H:%M:%S"))
print(utime)
return utime
我已确认此功能工作正常。所以我继续在整个列上运行它,并决定创建一个名为unixTime的新列来存储它:timeUDF = spark.udf.register('timeUDF',timeswap,FloatType()) result_conv = result.withColumn('unixTime', timeUDF('timestamp'))
看上去很有效。我花了几个星期的时间认为这是准确的,对数据运行算法,但最近发现数据以一种不应该的方式聚集;在同一天读了很多书所以我让斯帕克把专栏打印出来。这样做实际上会导致函数对每一行进行调用。我知道这是一件事,所以我把print语句作为一个健全的检查:result_conv.select('unixTime').head(5)
它输出这个# comments由我:
2018/01/01 00:00:06 #The original string date
1514782806.0 #the correct output from the function
2018/01/01 00:00:30
1514782830.0
2018/01/01 00:00:54
1514782854.0
2018/01/01 00:01:18
1514782878.0
2018/01/01 00:01:42
1514782902.0
[Row(unixTime=1514782848.0), #I don't know what this value is
Row(unixTime=1514782848.0),
Row(unixTime=1514782848.0),
Row(unixTime=1514782848.0),
Row(unixTime=1514782848.0)]
有人知道我错过了什么吗?我甚至已经确认,当运行超过5行时,行列表中的浮点数不存在,所以我不知道这个值是从哪里产生的,也不知道为什么它在各行之间重复。它既不是平均值,也不是中值(无论如何都不应该使用这些值),我不知道为什么它会重复(当我查看较长的行时,重复的数量不一致)。我真的很想避免把这个转换成PandasDF,然后再转换回SparkDF来做这个。底线是我需要将日期字符串转换为一个unixtime浮点数,这是唯一的每行传感器(因为它是在数据)。
感谢您的任何帮助!
1条答案
按热度按时间nnsrf1az1#
您不必使用UDF。你可以使用内置的pyspark函数来完成同样的任务。UDF是最后的手段。它会减慢你的程序。
我是这么做的我认为价值上的微小差异可能是由于时区问题。但我不确定。如果你能更好地说明问题,我可以提供更多帮助。例如:
(1514745006 (my computer) - 1514782806 (your computer)) = 37800 seconds = 10.5 hours
。所以这意味着你比我的时区早10.5小时。输出量: