使用多处理池读取csv并更改全局数组

ahy6op9u  于 2023-03-10  发布在  其他
关注(0)|答案(1)|浏览(174)

我准备了一个串行进程,该进程读取一个大型csv文件(步骤1),创建一些全局数组(步骤2),并根据csv文件的每个元素更改全局数组中的元素(步骤3)。
代码为:

import pandas as pd
import numpy as np

"""
Step 1
"""

file = 'filename.csv'

df = pd.read_csv(file, skiprows=6)
df.head()

"""
Step 2
"""

r_lim = np.array([0.0, 0.0001, 0.0002, 0.0003, 0.0004, 0.0005, 0.0006, 0.0007, 0.0008, 0.0009, 0.001, 0.0011, 0.0012, 0.0013, 0.0014, 0.0015, 0.0016, 0.0017, 0.0018, 0.0019, 0.002])
r_vol = np.array([0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0])
total_vol = 0.0

"""
Step 3
"""

def calc():
    global total_vol
    for i in range(len(df)):
        x = df.iloc[i, 1]
        y = df.iloc[i, 2]
        vol = df.iloc[i, 0]
        r_loc = np.sqrt(x**2 + y**2)
        for y in range(len(r_vol)):
            r_0 = r_lim[y]
            r_1 = r_lim[y+1]
            if r_loc >= r_0 and r_loc < r_1:
                r_vol[y] += vol
                break
        total_vol += vol

calc()
print(r_vol)
print(total_vol)

在串行模式下,这个进程运行得很好。但是,我无法使用Pool将其设置为多处理模式。我想到的方法是:
1.设置全局变量,
1.跳过前6行,全局读取.csv文件
1.使用Pool运行calc()进程,相应地更改全局数组,然后将所有数组连接在一起。
我想的是可能的吗?每个并行处理器需要单独读取.csv文件吗?提前感谢您的帮助!

yc0p9oo0

yc0p9oo01#

你还需要这些东西:
1.跨进程共享r_limr_voltotal_vol,以便每个进程都可以看到它们。如果r_limr_vol的长度固定,则可以使用multiprocessing.Array。可以使用multiprocessing.Value来存储total_vol
1.使用锁来保护对r_voltotal_vol的修改。如果使用multiprocessing.array和multiprocessing.value来存储它们,则无需担心此问题,因为它们在默认情况下有自己的锁。
但是由于你的csv文件比较大,而且内容会被复制到每个进程中,所以你需要注意一下你的内存使用情况,如果你的内存使用超出了,你最好在每个进程中读取文件的不同部分。
此外,calc()中的实际计算似乎不是很繁重(只是计算欧氏距离吗?),所以我建议您使用ThreadPool而不是ProcessPool,因为它更容易在线程之间共享变量。您可以尝试使用concurrent.futures.ThreadPoolExecutor。

相关问题