我有股票数据(超过6000+股,100+gb)保存为hdf5文件。
基本上,我正在尝试将这个pandas代码翻译成pyspark。理想情况下,我希望有用于排名的值,以及排名本身保存到一个文件。
agg_df = pd.DataFrame()
for stock in stocks:
df = pd.read_csv(stock)
df = my_func(df) # custom function, output of which will be used for ranking. For simplicity, can use standard deviation
agg_df = pd.concat([agg_df, df], ax=1) # row-wise concat
agg_df.rank() #.to_csv() <- would like to save ranks for future use
每个数据文件都有相同的架构,如:
Symbol Open High Low Close Volume
DateTime
2010-09-13 09:30:00 A 29.23 29.25 29.17 29.25 17667.0
2010-09-13 09:31:00 A 29.26 29.34 29.25 29.33 5000.0
2010-09-13 09:32:00 A 29.31 29.36 29.31 29.36 600.0
2010-09-13 09:33:00 A 29.33 29.36 29.30 29.35 7300.0
2010-09-13 09:34:00 A 29.35 29.39 29.31 29.39 3222.0
期望输出(其中每个数字是一个秩):
A AAPL MSFT ...etc
DateTime
2010-09-13 09:30:00 1 3 7 ...
2010-09-13 09:31:00 4 5 7 ...
2010-09-13 09:32:00 24 17 99 ...
2010-09-13 09:33:00 7 63 42 ...
2010-09-13 09:34:00 5 4 13 ...
我阅读了关于window和pyspark.sql的其他答案,但不知道如何将它们应用到我的案例中,因为我需要在排名前按行汇总这些答案(至少在pandas中是这样)
编辑1:在我将数据读取到rdd之后 rdd = sc.parallelize(data.keys).map(data.read_data)
,rdd变为pipelinerdd,它没有.select()方法。0xdfdf的示例包含一个Dataframe中的所有数据,但我不认为将所有内容附加到一个Dataframe中来进行计算是个好主意。
结果:终于解决了。有两个问题:读取文件和执行计算。
关于读取文件,我最初使用 rdd = sc.parallelize(data.keys).map(data.read_data)
这就产生了pipelinerdd,这是一个Dataframe的集合。这些需要转换为sparkDataframe,以便解决方案能够工作。最后我把我的hdf5文件转换成parquet,并把它们保存到一个单独的文件夹中。然后使用
sqlContext = pyspark.sql.SQLContext(sc)
rdd_p = sqlContext.read.parquet(r"D:\parq")
将所有文件读取到一个Dataframe。
然后根据接受的答案进行计算。非常感谢0xdff的帮助
额外费用:
讨论-https://chat.stackoverflow.com/rooms/214307/discussion-between-biarys-and-0xdfdfdfdf
0xDFDF解决方案-https://gist.github.com/0xdfdfdfdf/a93a7e444803f606008c7422784d1
1条答案
按热度按时间x7yiwoj41#
事实上,windows函数可以做到这一点。我已经创建了一个小的模拟数据集,它应该类似于您的。
现在,
df.show()
会给我们这个:下面是解决方案,它使用前面提到的窗口函数
rank()
. 需要一些转换,您可以使用pivot()
功能。通过呼叫
result.show()
您将得到:确保你了解
rank()
,dense_rank()
以及row_number()
函数,因为当它们在给定的窗口中遇到相等的数时,它们的行为不同-您可以在这里找到解释。