我准备了一个串行进程,该进程读取一个大型csv文件(步骤1),创建一些全局数组(步骤2),并根据csv文件的每个元素更改全局数组中的元素(步骤3)。
代码为:
import pandas as pd
import numpy as np
"""
Step 1
"""
file = 'filename.csv'
df = pd.read_csv(file, skiprows=6)
df.head()
"""
Step 2
"""
r_lim = np.array([0.0, 0.0001, 0.0002, 0.0003, 0.0004, 0.0005, 0.0006, 0.0007, 0.0008, 0.0009, 0.001, 0.0011, 0.0012, 0.0013, 0.0014, 0.0015, 0.0016, 0.0017, 0.0018, 0.0019, 0.002])
r_vol = np.array([0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0])
total_vol = 0.0
"""
Step 3
"""
def calc():
global total_vol
for i in range(len(df)):
x = df.iloc[i, 1]
y = df.iloc[i, 2]
vol = df.iloc[i, 0]
r_loc = np.sqrt(x**2 + y**2)
for y in range(len(r_vol)):
r_0 = r_lim[y]
r_1 = r_lim[y+1]
if r_loc >= r_0 and r_loc < r_1:
r_vol[y] += vol
break
total_vol += vol
calc()
print(r_vol)
print(total_vol)
在串行模式下,这个进程运行得很好。但是,我无法使用Pool将其设置为多处理模式。我想到的方法是:
1.设置全局变量,
1.跳过前6行,全局读取.csv文件
1.使用Pool运行calc()进程,相应地更改全局数组,然后将所有数组连接在一起。
我想的是可能的吗?每个并行处理器需要单独读取.csv文件吗?提前感谢您的帮助!
1条答案
按热度按时间yc0p9oo01#
你还需要这些东西:
1.跨进程共享
r_lim
、r_vol
和total_vol
,以便每个进程都可以看到它们。如果r_lim
和r_vol
的长度固定,则可以使用multiprocessing.Array。可以使用multiprocessing.Value来存储total_vol
1.使用锁来保护对
r_vol
和total_vol
的修改。如果使用multiprocessing.array和multiprocessing.value来存储它们,则无需担心此问题,因为它们在默认情况下有自己的锁。但是由于你的csv文件比较大,而且内容会被复制到每个进程中,所以你需要注意一下你的内存使用情况,如果你的内存使用超出了,你最好在每个进程中读取文件的不同部分。
此外,
calc()
中的实际计算似乎不是很繁重(只是计算欧氏距离吗?),所以我建议您使用ThreadPool而不是ProcessPool,因为它更容易在线程之间共享变量。您可以尝试使用concurrent.futures.ThreadPoolExecutor。