使用pyspark对多个文件进行排序

2lpgd968  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(503)

我有股票数据(超过6000+股,100+gb)保存为hdf5文件。
基本上,我正在尝试将这个pandas代码翻译成pyspark。理想情况下,我希望有用于排名的值,以及排名本身保存到一个文件。

agg_df = pd.DataFrame()
for stock in stocks:
    df = pd.read_csv(stock)
    df = my_func(df) # custom function, output of which will be used for ranking. For simplicity, can use standard deviation
    agg_df = pd.concat([agg_df, df], ax=1) # row-wise concat

agg_df.rank() #.to_csv() <- would like to save ranks for future use

每个数据文件都有相同的架构,如:

Symbol   Open   High    Low  Close   Volume
DateTime
2010-09-13 09:30:00      A  29.23  29.25  29.17  29.25  17667.0
2010-09-13 09:31:00      A  29.26  29.34  29.25  29.33   5000.0
2010-09-13 09:32:00      A  29.31  29.36  29.31  29.36    600.0
2010-09-13 09:33:00      A  29.33  29.36  29.30  29.35   7300.0
2010-09-13 09:34:00      A  29.35  29.39  29.31  29.39   3222.0

期望输出(其中每个数字是一个秩):

A   AAPL MSFT ...etc
DateTime
2010-09-13 09:30:00   1   3    7    ...
2010-09-13 09:31:00   4   5    7    ...
2010-09-13 09:32:00   24  17   99   ...
2010-09-13 09:33:00   7   63   42   ...
2010-09-13 09:34:00   5   4    13   ...

我阅读了关于window和pyspark.sql的其他答案,但不知道如何将它们应用到我的案例中,因为我需要在排名前按行汇总这些答案(至少在pandas中是这样)
编辑1:在我将数据读取到rdd之后 rdd = sc.parallelize(data.keys).map(data.read_data) ,rdd变为pipelinerdd,它没有.select()方法。0xdfdf的示例包含一个Dataframe中的所有数据,但我不认为将所有内容附加到一个Dataframe中来进行计算是个好主意。
结果:终于解决了。有两个问题:读取文件和执行计算。
关于读取文件,我最初使用 rdd = sc.parallelize(data.keys).map(data.read_data) 这就产生了pipelinerdd,这是一个Dataframe的集合。这些需要转换为sparkDataframe,以便解决方案能够工作。最后我把我的hdf5文件转换成parquet,并把它们保存到一个单独的文件夹中。然后使用

sqlContext = pyspark.sql.SQLContext(sc)
rdd_p = sqlContext.read.parquet(r"D:\parq")

将所有文件读取到一个Dataframe。
然后根据接受的答案进行计算。非常感谢0xdff的帮助
额外费用:
讨论-https://chat.stackoverflow.com/rooms/214307/discussion-between-biarys-and-0xdfdfdfdf
0xDFDF解决方案-https://gist.github.com/0xdfdfdfdf/a93a7e444803f606008c7422784d1

x7yiwoj4

x7yiwoj41#

事实上,windows函数可以做到这一点。我已经创建了一个小的模拟数据集,它应该类似于您的。

columns = ['DateTime', 'Symbol', 'Open', 'High', 'Low', 'Close', 'Volume']

data = [('2010-09-13 09:30:00','A',29.23,29.25,29.17,29.25,17667.0), 
        ('2010-09-13 09:31:00','A',29.26,29.34,29.25,29.33,5000.0),
        ('2010-09-13 09:32:00','A',29.31,29.36,29.31,29.36,600.0),
        ('2010-09-13 09:34:00','A',29.35,29.39,29.31,29.39,3222.0),
        ('2010-09-13 09:30:00','AAPL',39.23,39.25,39.17,39.25,37667.0), 
        ('2010-09-13 09:31:00','AAPL',39.26,39.34,39.25,39.33,3000.0),
        ('2010-09-13 09:32:00','AAPL',39.31,39.36,39.31,39.36,300.0),
        ('2010-09-13 09:33:00','AAPL',39.33,39.36,39.30,39.35,3300.0),
        ('2010-09-13 09:34:00','AAPL',39.35,39.39,39.31,39.39,4222.0),
        ('2010-09-13 09:34:00','MSFT',39.35,39.39,39.31,39.39,7222.0)]
df = spark.createDataFrame(data, columns)

现在, df.show() 会给我们这个:

+-------------------+------+-----+-----+-----+-----+-------+
|           DateTime|Symbol| Open| High|  Low|Close| Volume|
+-------------------+------+-----+-----+-----+-----+-------+
|2010-09-13 09:30:00|     A|29.23|29.25|29.17|29.25|17667.0|
|2010-09-13 09:31:00|     A|29.26|29.34|29.25|29.33| 5000.0|
|2010-09-13 09:32:00|     A|29.31|29.36|29.31|29.36|  600.0|
|2010-09-13 09:34:00|     A|29.35|29.39|29.31|29.39| 3222.0|
|2010-09-13 09:30:00|  AAPL|39.23|39.25|39.17|39.25|37667.0|
|2010-09-13 09:31:00|  AAPL|39.26|39.34|39.25|39.33| 3000.0|
|2010-09-13 09:32:00|  AAPL|39.31|39.36|39.31|39.36|  300.0|
|2010-09-13 09:33:00|  AAPL|39.33|39.36| 39.3|39.35| 3300.0|
|2010-09-13 09:34:00|  AAPL|39.35|39.39|39.31|39.39| 4222.0|
|2010-09-13 09:34:00|  MSFT|39.35|39.39|39.31|39.39| 7222.0|
+-------------------+------+-----+-----+-----+-----+-------+

下面是解决方案,它使用前面提到的窗口函数 rank() . 需要一些转换,您可以使用 pivot() 功能。

from pyspark.sql.window import Window
import pyspark.sql.functions as f

result = (df
 .select(
     'DateTime',
     'Symbol',
     f.rank().over(Window().partitionBy('DateTime').orderBy('Volume')).alias('rank')
 )
 .groupby('DateTime')
 .pivot('Symbol')
 .agg(f.first('rank'))
 .orderBy('DateTime')
)

通过呼叫 result.show() 您将得到:

+-------------------+----+----+----+
|           DateTime|   A|AAPL|MSFT|
+-------------------+----+----+----+
|2010-09-13 09:30:00|   1|   2|null|
|2010-09-13 09:31:00|   2|   1|null|
|2010-09-13 09:32:00|   2|   1|null|
|2010-09-13 09:33:00|null|   1|null|
|2010-09-13 09:34:00|   1|   2|   3|
+-------------------+----+----+----+

确保你了解 rank() , dense_rank() 以及 row_number() 函数,因为当它们在给定的窗口中遇到相等的数时,它们的行为不同-您可以在这里找到解释。

相关问题