如何使用python pandas处理传入的真实的时间数据

34gzjxbg  于 11个月前  发布在  Python
关注(0)|答案(3)|浏览(88)

使用pandas处理实时传入数据的最推荐/pythonic方式是什么?
每隔几秒钟,我就会收到一个数据点,格式如下:

{'time' :'2013-01-01 00:00:00', 'stock' : 'BLAH',
 'high' : 4.0, 'low' : 3.0, 'open' : 2.0, 'close' : 1.0}

字符串
我想把它附加到一个现有的DataFrame,然后对它运行一些分析。
问题是,仅仅用DataFrame.append追加行可能会导致所有复制的性能问题。

我尝试过的事情:

一些人建议预先分配一个大的DataFrame,并在数据进来时更新它:

In [1]: index = pd.DatetimeIndex(start='2013-01-01 00:00:00', freq='S', periods=5)

In [2]: columns = ['high', 'low', 'open', 'close']

In [3]: df = pd.DataFrame(index=t, columns=columns)

In [4]: df
Out[4]: 
                    high  low open close
2013-01-01 00:00:00  NaN  NaN  NaN   NaN
2013-01-01 00:00:01  NaN  NaN  NaN   NaN
2013-01-01 00:00:02  NaN  NaN  NaN   NaN
2013-01-01 00:00:03  NaN  NaN  NaN   NaN
2013-01-01 00:00:04  NaN  NaN  NaN   NaN

In [5]: data = {'time' :'2013-01-01 00:00:02', 'stock' : 'BLAH', 'high' : 4.0, 'low' : 3.0, 'open' : 2.0, 'close' : 1.0}

In [6]: data_ = pd.Series(data)

In [7]: df.loc[data['time']] = data_

In [8]: df
Out[8]: 
                    high  low open close
2013-01-01 00:00:00  NaN  NaN  NaN   NaN
2013-01-01 00:00:01  NaN  NaN  NaN   NaN
2013-01-01 00:00:02    4    3    2     1
2013-01-01 00:00:03  NaN  NaN  NaN   NaN
2013-01-01 00:00:04  NaN  NaN  NaN   NaN


另一种方法是构建一个字典列表,只需将传入的数据附加到一个列表中,然后将其分割成更小的DataFrame即可。

In [9]: ls = []

In [10]: for n in range(5):
   .....:     # Naive stuff ahead =)
   .....:     time = '2013-01-01 00:00:0' + str(n)
   .....:     d = {'time' : time, 'stock' : 'BLAH', 'high' : np.random.rand()*10, 'low' : np.random.rand()*10, 'open' : np.random.rand()*10, 'close' : np.random.rand()*10}
   .....:     ls.append(d)

In [11]: df = pd.DataFrame(ls[1:3]).set_index('time')

In [12]: df
Out[12]: 
                        close      high       low      open stock
time                                                             
2013-01-01 00:00:01  3.270078  1.008289  7.486118  2.180683  BLAH
2013-01-01 00:00:02  3.883586  2.215645  0.051799  2.310823  BLAH


或者类似的东西,也许处理输入多一点。

p1iqtdky

p1iqtdky1#

我将使用HDF 5/pytables如下:
1.保持数据作为一个python列表“尽可能长”。
1.将结果附加到该列表中。

  • 当它变得“大”时:
  • 使用panda io(和一个可追加的表格)推送到HDF 5商店。
  • 请清除列表。
  • 重复.

事实上,我定义的函数为每个“键”使用一个列表,以便您可以在同一过程中将多个DataFrame存储到HDF 5 Store。
我们定义了一个函数,您可以对每一行d调用该函数:

CACHE = {}
STORE = 'store.h5'   # Note: another option is to keep the actual file open

def process_row(d, key, max_len=5000, _cache=CACHE):
    """
    Append row d to the store 'key'.

    When the number of items in the key's cache reaches max_len,
    append the list of rows to the HDF5 store and clear the list.

    """
    # keep the rows for each key separate.
    lst = _cache.setdefault(key, [])
    if len(lst) >= max_len:
        store_and_clear(lst, key)
    lst.append(d)

def store_and_clear(lst, key):
    """
    Convert key's cache list to a DataFrame and append that to HDF5.
    """
    df = pd.DataFrame(lst)
    with pd.HDFStore(STORE) as store:
        store.append(key, df)
    lst.clear()

字符串

  • 注意事项:我们使用with语句在每次写入后自动关闭存储。保持存储打开可能会更快,但如果是这样的话,建议您定期刷新(关闭刷新)。另外请注意,使用集合deque而不是列表可能更易于阅读,但列表的性能在这里会稍微好一些。*

若要使用这个,请呼叫为:

process_row({'time' :'2013-01-01 00:00:00', 'stock' : 'BLAH', 'high' : 4.0, 'low' : 3.0, 'open' : 2.0, 'close' : 1.0},
            key="df")

  • 注意:“df”是在pytables存储中使用的存储密钥。*

作业完成后,请确保store_and_clear剩余缓存:

for k, lst in CACHE.items():  # you can instead use .iteritems() in python 2
    store_and_clear(lst, k)


现在,您可以通过以下方式获得完整的DataFrame:

with pd.HDFStore(STORE) as store:
    df = store["df"]                    # other keys will be store[key]

一些评论:

  • 5000是可以调整的,尝试用一些更小/更大的数字来适应你的需要。
  • 时间复杂度为O(len(df))。
  • 在你做统计或数据挖掘之前,你不需要Pandas,用最快的。
  • 此代码适用于多个关键字(数据点)。
  • 这是非常少的代码,我们停留在vanilla python列表,然后Pandas Dataframe ...

此外,要获取最新的读取,您可以定义一个get方法,该方法在阅读之前存储和清除数据。通过这种方式,您可以获取最新的数据:

def get_latest(key, _cache=CACHE):
    store_and_clear(_cache[key], key)
    with pd.HDFStore(STORE) as store:
        return store[key]


现在,当您使用以下命令访问时:

df = get_latest("df")


你会得到最新的“df”
另一个选项 * 稍微 * 更复杂:在vanilla pytables中定义一个自定义表,请参见教程。

  • 注意:要创建列描述符,您需要知道字段名。*
bis0qfac

bis0qfac2#

实际上,您要解决两个问题:捕获实时数据和分析该数据。第一个问题可以通过专门为此目的而设计的Python logging来解决。然后,另一个问题可以通过阅读同一个日志文件来解决。

2izufjch

2izufjch3#

这是一个老问题,但pyarrow可以处理以优化的方式将流写入文件和阅读它们。它的API可以轻松实现类似于HDF 5的解决方案。
写入文件:

import pyarrow as pa

d = {'time' :'2013-01-01 00:00:00', 'stock' : 'BLAH',
 'high' : 4.0, 'low' : 3.0, 'open' : 2.0, 'close' : 1.0}

table_schema = pa.Table.from_pylist([d]).schema
sink = pa.OSFile('bigfile.arrow', 'wb')

with pa.ipc.new_stream(sink, table_schema) as writer:
    writer.write_batch(pa.RecordBatch.from_pylist(chunk))
# where chunk is a list of dictionaries similar to d

字符串
你可以在不同的线程中读取你的文件:

with pa.OSFile('bigfile.arrow', 'rb') as source:
    df = pa.ipc.open_stream(source).read_pandas()


下面是一个功能齐全的示例,可以在任意给定的时间间隔内写入可自定义大小的块:

import pyarrow as pa
import tkinter as tk
from threading import Thread
from time import sleep

chunk_size = 10000  # number of rows received at each cycle
delay_in_s = 0.05   # time between chunks in s

arrow_file = 'bigfile.arrow'

d = {'time' :'2013-01-01 00:00:00', 'stock' : 'BLAH',
 'high' : 4.0, 'low' : 3.0, 'open' : 2.0, 'close' : 1.0, 'count': 0}

table_schema = pa.Table.from_pylist([d]).schema
sink = pa.OSFile(arrow_file, 'wb')

class MainWindow(tk.Tk):
    def __init__(self, **kwarg):
        super().__init__(**kwarg)
        self.lbl = tk.Label(self)
        self.lbl.pack(side='top', expand=True, fill='both')
        dump_btn = tk.Button(self, text='Dump df', command=self.dump_df)
        dump_btn.pack(side='top')
        self.dump_txt = tk.Text(self)
        self.dump_txt.pack(side='top', fill='both')
            
    def dump_df(self):
        with pa.OSFile(arrow_file, 'rb') as source:
            df = pa.ipc.open_stream(source).read_pandas()
            self.dump_txt.delete("1.0", "end")
            self.dump_txt.insert('end', df)

root = MainWindow()

def add_rows():
    i = 0
    with pa.ipc.new_stream(sink, table_schema) as writer:
        while True:
            row = [dict(d, count=i) for i in range(i, i + chunk_size)]
            root.lbl['text'] = f"Adding rows {i} to {i + chunk_size - 1}"
            i += chunk_size
            writer.write_batch(pa.RecordBatch.from_pylist(row))
            sleep(delay_in_s)

t = Thread(target=add_rows)
t.setDaemon(True)
t.start()

root.mainloop()


我能够写入超过15.000.000行(总共增加到~ 1.2GB),并在最后读取它们,只有轻微的延迟。

相关问题