使用pandas处理实时传入数据的最推荐/pythonic方式是什么?
每隔几秒钟,我就会收到一个数据点,格式如下:
{'time' :'2013-01-01 00:00:00', 'stock' : 'BLAH',
'high' : 4.0, 'low' : 3.0, 'open' : 2.0, 'close' : 1.0}
字符串
我想把它附加到一个现有的DataFrame,然后对它运行一些分析。
问题是,仅仅用DataFrame.append追加行可能会导致所有复制的性能问题。
我尝试过的事情:
一些人建议预先分配一个大的DataFrame,并在数据进来时更新它:
In [1]: index = pd.DatetimeIndex(start='2013-01-01 00:00:00', freq='S', periods=5)
In [2]: columns = ['high', 'low', 'open', 'close']
In [3]: df = pd.DataFrame(index=t, columns=columns)
In [4]: df
Out[4]:
high low open close
2013-01-01 00:00:00 NaN NaN NaN NaN
2013-01-01 00:00:01 NaN NaN NaN NaN
2013-01-01 00:00:02 NaN NaN NaN NaN
2013-01-01 00:00:03 NaN NaN NaN NaN
2013-01-01 00:00:04 NaN NaN NaN NaN
In [5]: data = {'time' :'2013-01-01 00:00:02', 'stock' : 'BLAH', 'high' : 4.0, 'low' : 3.0, 'open' : 2.0, 'close' : 1.0}
In [6]: data_ = pd.Series(data)
In [7]: df.loc[data['time']] = data_
In [8]: df
Out[8]:
high low open close
2013-01-01 00:00:00 NaN NaN NaN NaN
2013-01-01 00:00:01 NaN NaN NaN NaN
2013-01-01 00:00:02 4 3 2 1
2013-01-01 00:00:03 NaN NaN NaN NaN
2013-01-01 00:00:04 NaN NaN NaN NaN
型
另一种方法是构建一个字典列表,只需将传入的数据附加到一个列表中,然后将其分割成更小的DataFrame即可。
In [9]: ls = []
In [10]: for n in range(5):
.....: # Naive stuff ahead =)
.....: time = '2013-01-01 00:00:0' + str(n)
.....: d = {'time' : time, 'stock' : 'BLAH', 'high' : np.random.rand()*10, 'low' : np.random.rand()*10, 'open' : np.random.rand()*10, 'close' : np.random.rand()*10}
.....: ls.append(d)
In [11]: df = pd.DataFrame(ls[1:3]).set_index('time')
In [12]: df
Out[12]:
close high low open stock
time
2013-01-01 00:00:01 3.270078 1.008289 7.486118 2.180683 BLAH
2013-01-01 00:00:02 3.883586 2.215645 0.051799 2.310823 BLAH
型
或者类似的东西,也许处理输入多一点。
3条答案
按热度按时间p1iqtdky1#
我将使用HDF 5/pytables如下:
1.保持数据作为一个python列表“尽可能长”。
1.将结果附加到该列表中。
事实上,我定义的函数为每个“键”使用一个列表,以便您可以在同一过程中将多个DataFrame存储到HDF 5 Store。
我们定义了一个函数,您可以对每一行
d
调用该函数:字符串
若要使用这个,请呼叫为:
型
作业完成后,请确保
store_and_clear
剩余缓存:型
现在,您可以通过以下方式获得完整的DataFrame:
型
一些评论:
len(df)
)。此外,要获取最新的读取,您可以定义一个get方法,该方法在阅读之前存储和清除数据。通过这种方式,您可以获取最新的数据:
型
现在,当您使用以下命令访问时:
型
你会得到最新的“df”
另一个选项 * 稍微 * 更复杂:在vanilla pytables中定义一个自定义表,请参见教程。
bis0qfac2#
实际上,您要解决两个问题:捕获实时数据和分析该数据。第一个问题可以通过专门为此目的而设计的Python logging来解决。然后,另一个问题可以通过阅读同一个日志文件来解决。
2izufjch3#
这是一个老问题,但
pyarrow
可以处理以优化的方式将流写入文件和阅读它们。它的API可以轻松实现类似于HDF 5的解决方案。写入文件:
字符串
你可以在不同的线程中读取你的文件:
型
下面是一个功能齐全的示例,可以在任意给定的时间间隔内写入可自定义大小的块:
型
我能够写入超过15.000.000行(总共增加到~ 1.2GB),并在最后读取它们,只有轻微的延迟。