csv 在python中高效处理约5000万个记录文件

c8ib6hqw  于 2022-12-06  发布在  Python
关注(0)|答案(3)|浏览(282)

我们有一个CSV格式的文件,其中包含大约4600万条记录。每条记录包含大约18个字段,其中一个字段是64字节的ID。我们还有一个文件,其中包含大约167 K个唯一ID。需要将与ID对应的记录提取出来。因此,我们已经编写了一个python程序,它将167 K个ID读入一个数组,并处理4600万条记录文件,检查这些记录中的每一条是否都存在ID。下面是代码片段:

import csv
...
csvReadHandler = csv.reader(inputFile, delimiter=chr(1))
csvWriteHandler = csv.writer(outputFile, delimiter=chr(1), lineterminator='\n')
for fieldAry in csvReadHandler:
    lineCounts['orig'] += 1
    if fieldAry[CUSTOMER_ID] not in idArray:
        csvWriteHandler.writerow(fieldAry)
        lineCounts['mod'] += 1

在一小组数据上测试了该程序,下面是处理时间:

lines: 117929 process time: 236.388447046 sec
lines: 145390 process time: 277.075321913 sec

我们已经开始运行该程序的4600万记录文件(这是约13 GB的大小)昨晚~3:00上午东部时间,现在是上午10点左右东部时间,它仍然在处理!
问题:
1.是否有更好的方法来处理这些记录以缩短处理时间?

  1. python是正确的选择吗?awk或其他工具会有帮助吗?
    1.我猜以下语句中的167 K数组上的64字节ID查找是罪魁祸首:
    if fieldAry[CUSTOMER_ID] not in idArray:
    有没有更好的选择?
    谢谢你,谢谢你
    更新:在具有EBS附加卷的EC2示例上处理此更新。
iezvtpos

iezvtpos1#

您应该必须使用set而不是list;* 在 * for循环之前执行:

idArray = set(idArray)

csvReadHandler = csv.reader(inputFile, delimiter=chr(1))
csvWriteHandler = csv.writer(outputFile, delimiter=chr(1), lineterminator='\n')
for fieldAry in csvReadHandler:
    lineCounts['orig'] += 1
    if fieldAry[CUSTOMER_ID] not in idArray:
        csvWriteHandler.writerow(fieldAry)
        lineCounts['mod'] += 1

看到令人难以置信的加速;仅仅因为选择了错误的数据结构就浪费了*天*的不必要的处理时间。
带有setin运算符具有
O(1)时间复杂度、而O(n)
list的时间复杂度。这听起来似乎“没什么大不了的”,但实际上它是脚本中的瓶颈。即使set的常数稍高O。所以你的代码在一个in操作上比实际需要多花了30000多的时间,如果在最佳版本中需要30秒,现在你在这一行上就花了10天。
请参见以下测试:我生成了100万个ID,并将其中的10000个放入另一个列表to_remove中,然后像您一样执行for循环,对每条记录执行in操作:

import random
import timeit

all_ids = [random.randint(1, 2**63) for i in range(1000000)]
to_remove = all_ids[:10000]
random.shuffle(to_remove)
random.shuffle(all_ids)

def test_set():
    to_remove_set = set(to_remove)
    for i in all_ids:
        if i in to_remove_set:
            pass

def test_list():
    for i in all_ids:
        if i in to_remove:
            pass

print('starting')
print('testing list', timeit.timeit(test_list, number=1))
print('testing set', timeit.timeit(test_set, number=1))

结果:

testing list 227.91903045598883
testing set 0.14897623099386692

set版本花费了149毫秒; list版本需要228秒。现在这些都是很小的数字:在你的例子中,你有5000万条输入记录,而我只有100万条;因此,需要将testing set时间乘以50:用你的数据集,大概需要7.5秒。
另一方面,列表版本则需要将该时间乘以50 * 17 --不仅输入记录数增加了50倍,而且需要匹配的记录数增加了17倍,因此我们得到227 * 50 * 17 = 192950。
所以你的算法花了2.2天来完成一些事情,而使用正确的数据结构可以在7.5秒内完成。当然,这并不意味着你可以在7.5秒内扫描 * 整个50 GB的文档 *,但它可能也不会超过2.2天。所以我们从:

2 days                           2.2 days 
 |reading and writing the files||------- doing id in list ------|

变成了

2 days            7.5 seconds (doing id in set)
 |reading and writing the files||
5lwkijsr

5lwkijsr2#

要加快速度,最简单的方法是使用一些分布式解决方案来并行化行处理。其中最简单的方法之一是使用multiprocessing. Pool。您应该这样做(不检查语法):

from multiprocessing import Pool

p = Pool(processes=4)
p.map(process_row, csvReadHandler)

尽管如此,python并不是进行这种批处理的最佳语言(主要是因为写入磁盘非常慢)。最好将所有磁盘写入管理(缓冲、队列等)留给linux内核,因此使用bash解决方案会更好。最有效的方法是将输入文件分成块,然后简单地执行反向grep来过滤id。

for file in $list_of_splitted_files; then
  cat $file | grep -v (id1|id2|id3|...) > $file.out
done;

如果以后需要合并,只需:

for file in $(ls *.out); then
  cat $file >> final_results.csv
done

注意事项:

  • 不知道使用所有id执行单个grep是否比遍历所有id并执行单个id grep更高效/更低效。
  • 在编写并行解决方案时,尝试读取/写入不同的文件,以最大限度地减少所有语言中的I/O瓶颈(所有线程尝试写入同一文件)。
  • 在你的代码中为每个处理部分设置计时器。这样你就可以看到哪个部分浪费了更多的时间。我真的推荐这个因为我有一个类似的程序要写,我认为处理部分是瓶颈(类似于你对id向量的比较),但实际上是I/O拖慢了所有的执行。
ovfsdjhp

ovfsdjhp3#

我认为最好使用数据库来解决这个问题,首先创建一个像MySql或其他任何数据库,然后将文件中的数据写入2个表,最后使用一个简单的SQL查询来选择行,比如:从表1中选择 *,其中id在(从表2中选择id)

相关问题