import multiprocessing as mp
import time
fn = 'c:/temp/temp.txt'
def worker(arg, q):
'''stupidly simulates long running process'''
start = time.clock()
s = 'this is a test'
txt = s
for i in range(200000):
txt += s
done = time.clock() - start
with open(fn, 'rb') as f:
size = len(f.read())
res = 'Process' + str(arg), str(size), done
q.put(res)
return res
def listener(q):
'''listens for messages on the q, writes to file. '''
with open(fn, 'w') as f:
while 1:
m = q.get()
if m == 'kill':
f.write('killed')
break
f.write(str(m) + '\n')
f.flush()
def main():
#must use Manager queue here, or will not work
manager = mp.Manager()
q = manager.Queue()
pool = mp.Pool(mp.cpu_count() + 2)
#put listener to work first
watcher = pool.apply_async(listener, (q,))
#fire off workers
jobs = []
for i in range(80):
job = pool.apply_async(worker, (i, q))
jobs.append(job)
# collect results from the workers through the pool result queue
for job in jobs:
job.get()
#now we are done, kill the listener
q.put('kill')
pool.close()
pool.join()
if __name__ == "__main__":
main()
from multiprocessing import Manager
from multiprocessing import Pool
import pandas as pd
def worker(row, param):
# do something here and then append it to row
x = param**2
row.append(x)
if __name__ == '__main__':
pool_parameter = [] # list of objects to process
with Manager() as mgr:
row = mgr.list([])
# build list of parameters to send to starmap
for param in pool_parameter:
params.append([row,param])
with Pool() as p:
p.starmap(worker, params)
import fcntl
import time
def myprint(*args):
print(time.ctime(), *args)
def main():
with open("safe.txt", "r+") as file:
myprint("locking")
# this will block (unless LOCK_EX | LOCK_NB is used)
fcntl.lockf(file, fcntl.LOCK_EX)
lines = file.readlines()
# make race conditions more likely
time.sleep(1)
# "1" or one more than the the previous entry
newval = int(lines[-1])+1 if lines else 1
print(newval)
file.write(str(newval) + "\n")
file.flush()
myprint("unlocking")
fcntl.lockf(file, fcntl.F_UNLCK)
if __name__ == '__main__':
main()
字符串 你可以通过在终端中运行以下命令来检查它是否在本地工作:
touch safe.txt # this needs to already exist
for x in 1 2 3 4 5
do
python safe.py &
done
cat safe.txt # should have 1-5 inside
from multiprocessing import Process, Queue
import h5py
import numpy as np
from time import sleep, time
def func(i, q, filename, subfilename):
# Reading from the subfile
with h5py.File(subfilename, 'r') as ds:
array = ds['array'][:]
sleeptime = np.random.rand(1)*4 + 1
sleep(sleeptime[0])
# Print array loaded to compare to output in the summary file
print(i, f'{sleeptime[0]:.3f}', array)
# If anything is put in the queue it means that a process can start writing
q.get()
with h5py.File(filename, 'r+') as ds:
ds['array'][i, :] = array
# Indicate to the other processes that we are done writing to the summary
# file
q.put(0)
if __name__ == '__main__':
N = 10
Nsample = 5
subfilenames = [f'sub_{i:>02d}.h5' for i in range(N)]
for i in range(N):
with h5py.File(subfilenames[i], 'w') as ds:
disp = ds.create_dataset(
'array', data=np.random.randint(0, 5, size=(5,)), dtype='f')
filename = 'test.h5'
with h5py.File(filename, 'w') as ds:
disp = ds.create_dataset('array', (N, Nsample), dtype='f')
# Create a queue to communicate between the writing status between the
# processes
q = Queue()
# Put a 0 in the queue to indicate that a worker can start writing
q.put(0)
# Start the timer
t0 = time()
# Distribute the work to the workers
processes = []
print(" T sleeptime array", flush=True)
print("-----------------------", flush=True)
for i in range(N):
p = Process(target=func, args=(
i, q, filename, subfilenames[i]))
p.start()
processes.append(p)
# Wait for the workers to finish
for p in processes:
p.join()
# Print time taken
print(f'Total time taken: {time()-t0:.2f} s')
4条答案
按热度按时间t2a7ltrp1#
@GP89提到了一个很好的解决方案。使用队列将写入任务发送到对文件具有唯一写入访问权限的专用进程。所有其他工作线程都具有只读访问权限。这将消除冲突。下面是一个使用apply_async的例子,但它也可以使用map:
字符串
f8rj6qna2#
在我看来,您需要使用
Manager
将结果临时保存到列表中,然后将结果从列表写入文件。另外,使用starmap
传递要处理的对象和托管列表。第一步是构建要传递给starmap
的参数,其中包括托管列表。字符串
从这一点上,你需要决定你将如何处理列表。如果您有大量的RAM和庞大的数据集,可以使用pandas连接。然后你可以很容易地将文件保存为csv或pickle。
型
vc9ivgsu3#
为了回应评论说这是在集群上运行的,一个简单的选项,它不依赖于进程间通信,是使用Python标准库中的fcntl锁定memoization文件。
这适用于MacOS,我希望它能在大多数unix系统上工作,尽管它需要在您的特定网络存储实现上进行测试:
safe.py
字符串
你可以通过在终端中运行以下命令来检查它是否在本地工作:
型
如果将其与多处理结合使用,每个进程可能都需要自己的文件描述符(因此在每个进程中分别运行
open()
)。nnt7mjpx4#
我想我也会发布一个稍微简单的问题的解决方案,因为每当我寻找我的问题时,这个页面就会出现。
我有点松散地基于@MikeHunter的解决方案above。我需要稍微不同的东西的原因是,我想在每个进程结束时写入的数组相当大,这意味着将它们放入队列并从队列中获取它们并使用不同的进程写入它们意味着大量的pickle和unpickle超大数组。这个 * 并不 * 处理OP要求的检查许多子问题和子子问题的问题,但是它处理了问题的“标题”!
我该怎么办?
我没有共享数组,而是将“0”放在队列中,让进程得到“0”。如果没有什么可获取的,则进程空闲,直到有东西为止。如果有,进程开始写入,并在写入后将某些内容放入队列中。重要的是,给定一个较长的写入过程,
q.get()
默认将timeout
设置为None
。如果写入时间很长,而超时设置为较短的时间,则当然会失败。字符串
如果将脚本保存为
hello.py
,则可以运行并对输出进行如下排序:型
这应该会产生类似这样的东西:
型
检查写入的HDF 5文件:
型
结果应该是这样的
型
注意,使用mpi 4py有更好的方法,但我需要用户不要担心MPI。