我正在处理一个大的csv数据文件,其中的文件是: user_id
, timestamp
, category
,我正在为每个用户的每个类别建立分数。我首先对csv文件进行分块,然后应用 groupby
(在最后两个数字上) user_id
)所以我可以存储总共100个包含用户组的文件,并将它们存储在hdf5存储区中。
然后我在我的存储上做一个大for循环,一个接一个地处理每个存储的文件。对于其中的每一个,我都按用户id分组,然后计算该用户的得分。然后我写一个输出csv,每个用户一行,包含所有的分数。
我注意到这个主循环在我的个人电脑上需要4个小时,我想加速它,因为它看起来完全可以并行。我该怎么办?我想到了 multiprocessing
或者 hadoop streaming
,什么是最好的?
以下是我的(简化)代码:
def sub_group_hash(x):
return x['user_id'].str[-2:]
reader = read_csv('input.csv', chunksize=500000)
with get_store('grouped_input.h5') as store:
for chunk in reader:
groups = chunk.groupby(sub_group_hash(chunk))
for grp, grouped in groups:
store.append('group_%s' % grp, grouped,
data_columns=['user_id','timestamp','category'])
with open('stats.csv','wb') as outfile:
spamwriter = csv.writer(outfile)
with get_store('grouped_input.h5') as store:
for grp in store.keys(): #this is the loop I would like to parallelize
grouped = store.select(grp).groupby('user_id')
for user, user_group in grouped:
output = my_function(user,user_group)
spamwriter.writerow([user] + output)
1条答案
按热度按时间qncylg1j1#
我建议多线程。线程库非常简单直观。https://docs.python.org/3/library/threading.html#thread-对象
我有点不明白你的意思,你的主要循环是什么,但我假设它的所有上述过程。如果是这种情况,请将其包含到定义中,并使用
一个像样的教程可以在这里找到。如果您对python足够熟悉并能够使用它,那么它还向您展示了一种更高级的线程技术。http://www.tutorialspoint.com/python/python_multithreading.htm
棘手的是,当您写入文件时,您不希望所有进程同时写入文件,但幸运的是,您可以创建一个带有锁的阻塞点。这个
acquire()
以及release()
围绕此进程的函数将确保一次只写入一个线程。还要注意你的电脑上有多少内核。如果你在电脑上运行的线程比内核多,那么每个线程都要等待cpu时间,而且你在速度上并没有得到多少提高。如果你创建了无限多的进程,你也可以很容易地用叉子炸你的电脑。