Pandas读取不断增长的CSV文件

gz5pxeao  于 2023-02-17  发布在  其他
关注(0)|答案(2)|浏览(134)

我有一个不断增长的CSV文件,我想定期阅读。我也只对新的值感兴趣。
我希望能做些像这样的事:

file_chunks = pd.read_csv('file.csv', chunksize=1)
while True:
    do_something(next(file_chunks))
    time.sleep(0.1)

在频率上,这比.csv文件增长的速度要快。但是,只要迭代器一次也没有返回值,它就会“中断”,不返回值,即使.csv文件在此期间已经增长。
有没有一种方法可以逐行读取不断增长的.csv文件?

eimct9ow

eimct9ow1#

你可以围绕它构建一个try: except:或者make和if语句,首先检查if file_chunks is not none。像这样,它不应该再坏了,他只在没有更多的块时睡觉。

while True:
    file_chunks = pd.read_csv('file.csv', chunksize=1)
    while True:
        try:
            do_something(next(file_chunks))
        except:
            time.sleep(0.1)
9rnv2umw

9rnv2umw2#

这在标准的csv模块中更容易实现,你可以编写自己的行迭代器来读取更新文件。这个生成器将以二进制模式读取,这样它就可以跟踪文件位置,在EOF处关闭文件,并轮询其大小以获取附加数据。如果读取器因为另一端还没有刷新而获得部分文件更新,这可能会失败。或者CSV单元格包含并嵌入了新行,这使得读者认为二进制模式换行符总是终止行的假设无效。

import csv
import time
import os
import threading
import random

def rolling_reader(filename, poll_period=.1, encoding="utf-8"):
    pos = 0
    while True:
        while True:
            try:
                if os.stat(filename).st_size > pos:
                    break
            except FileNotFoundError:
                pass
            time.sleep(poll_period)
        fp = open(filename, "rb")
        fp.seek(pos)
        for line in fp:
            if line.strip():
                yield line.decode("utf-8")
        pos = fp.tell()

# ---- TEST - thread updates test.csv periodically
class GenCSVThread(threading.Thread):

    def __init__(self, csv_name):
        super().__init__(daemon=True)
        self.csv_name = csv_name
        self.start()

    def run(self):
        val = 1
        while True:
            with open(self.csv_name, "a") as fp:
                for _ in range(random.randrange(4)):
                    fp.write(",".join(str(val) for _ in range(4)) + "\n")
                    val += 1
            time.sleep(random.random())

if os.path.exists("test.csv"):
    os.remove("test.csv")
test_gen = GenCSVThread("test.csv")

reader = csv.reader(rolling_reader("test.csv"))
for row in reader:
    print(row)

依赖于平台的更新将使用inotify之类的工具来触发文件关闭操作的读取,以降低部分数据的风险。

相关问题