使用muliprocessing或hadoop加速大数据上的python脚本

uajslkp6  于 2021-05-30  发布在  Hadoop
关注(0)|答案(1)|浏览(343)

我正在处理一个大的csv数据文件,其中的文件是: user_id , timestamp , category ,我正在为每个用户的每个类别建立分数。我首先对csv文件进行分块,然后应用 groupby (在最后两个数字上) user_id )所以我可以存储总共100个包含用户组的文件,并将它们存储在hdf5存储区中。
然后我在我的存储上做一个大for循环,一个接一个地处理每个存储的文件。对于其中的每一个,我都按用户id分组,然后计算该用户的得分。然后我写一个输出csv,每个用户一行,包含所有的分数。
我注意到这个主循环在我的个人电脑上需要4个小时,我想加速它,因为它看起来完全可以并行。我该怎么办?我想到了 multiprocessing 或者 hadoop streaming ,什么是最好的?
以下是我的(简化)代码:

def sub_group_hash(x):
    return x['user_id'].str[-2:]

reader = read_csv('input.csv', chunksize=500000)                                  
with get_store('grouped_input.h5') as store:
    for chunk in reader:
        groups = chunk.groupby(sub_group_hash(chunk))
        for grp, grouped in groups:
            store.append('group_%s' % grp, grouped,
                 data_columns=['user_id','timestamp','category'])

with open('stats.csv','wb') as outfile:
    spamwriter = csv.writer(outfile)
    with get_store('grouped_input.h5') as store:
        for grp in store.keys(): #this is the loop I would like to parallelize
            grouped = store.select(grp).groupby('user_id')
            for user, user_group in grouped:
                output = my_function(user,user_group)
                spamwriter.writerow([user] + output)
qncylg1j

qncylg1j1#

我建议多线程。线程库非常简单直观。https://docs.python.org/3/library/threading.html#thread-对象
我有点不明白你的意思,你的主要循环是什么,但我假设它的所有上述过程。如果是这种情况,请将其包含到定义中,并使用

import thread
t1 = threading.thread(process, ("any", "inputs"))
t1.start()

一个像样的教程可以在这里找到。如果您对python足够熟悉并能够使用它,那么它还向您展示了一种更高级的线程技术。http://www.tutorialspoint.com/python/python_multithreading.htm
棘手的是,当您写入文件时,您不希望所有进程同时写入文件,但幸运的是,您可以创建一个带有锁的阻塞点。这个 acquire() 以及 release() 围绕此进程的函数将确保一次只写入一个线程。
还要注意你的电脑上有多少内核。如果你在电脑上运行的线程比内核多,那么每个线程都要等待cpu时间,而且你在速度上并没有得到多少提高。如果你创建了无限多的进程,你也可以很容易地用叉子炸你的电脑。

相关问题